1use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::iter;
13use std::num::{NonZeroI64, NonZeroUsize};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use futures::future::{BoxFuture, FutureExt};
19use futures::stream::FuturesOrdered;
20use futures::{Future, StreamExt, future};
21use itertools::Itertools;
22use maplit::btreeset;
23use mz_adapter_types::compaction::CompactionWindow;
24use mz_adapter_types::connection::ConnectionId;
25use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
26use mz_catalog::memory::objects::{
27 CatalogItem, Cluster, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
28};
29use mz_cloud_resources::VpcEndpointConfig;
30use mz_controller_types::ReplicaId;
31use mz_expr::{
32 CollectionPlan, MapFilterProject, OptimizedMirRelationExpr, ResultSpec, RowSetFinishing,
33};
34use mz_ore::cast::CastFrom;
35use mz_ore::collections::{CollectionExt, HashSet};
36use mz_ore::task::{self, JoinHandle, spawn};
37use mz_ore::tracing::OpenTelemetryContext;
38use mz_ore::{assert_none, instrument};
39use mz_persist_client::stats::SnapshotPartStats;
40use mz_repr::adt::jsonb::Jsonb;
41use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
42use mz_repr::explain::ExprHumanizer;
43use mz_repr::explain::json::json_string;
44use mz_repr::role_id::RoleId;
45use mz_repr::{
46 CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, RelationVersion,
47 RelationVersionSelector, Row, RowArena, RowIterator, Timestamp,
48};
49use mz_sql::ast::{
50 AlterSourceAddSubsourceOption, CreateSourceOptionName, CreateSubsourceOption,
51 CreateSubsourceOptionName, SqlServerConfigOption, SqlServerConfigOptionName,
52};
53use mz_sql::ast::{CreateSubsourceStatement, MySqlConfigOptionName, UnresolvedItemName};
54use mz_sql::catalog::{
55 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError,
56 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRole, CatalogSchema, CatalogTypeDetails,
57 ErrorMessageObjectDescription, ObjectType, RoleAttributes, RoleVars, SessionCatalog,
58};
59use mz_sql::names::{
60 Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
61 SchemaSpecifier, SystemObjectId,
62};
63use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext};
64use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
65use mz_storage_types::sinks::StorageSinkDesc;
66use mz_storage_types::sources::GenericSourceConnection;
67use mz_sql::plan::{
69 AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
70 Explainee, ExplaineeStatement, MutationKind, Params, Plan, PlannedAlterRoleOption,
71 PlannedRoleVariable, QueryWhen, SideEffectingFunc, UpdatePrivilege, VariableValue,
72};
73use mz_sql::session::metadata::SessionMetadata;
74use mz_sql::session::user::UserKind;
75use mz_sql::session::vars::{
76 self, IsolationLevel, NETWORK_POLICY, OwnedVarInput, SCHEMA_ALIAS, SessionVars,
77 TRANSACTION_ISOLATION_VAR_NAME, Var, VarError, VarInput,
78};
79use mz_sql::{plan, rbac};
80use mz_sql_parser::ast::display::AstDisplay;
81use mz_sql_parser::ast::{
82 ConnectionOption, ConnectionOptionName, CreateSourceConnection, DeferredItemName,
83 MySqlConfigOption, PgConfigOption, PgConfigOptionName, Statement, TransactionMode,
84 WithOptionValue,
85};
86use mz_ssh_util::keys::SshKeyPairSet;
87use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDescription};
88use mz_storage_types::AlterCompatible;
89use mz_storage_types::connections::inline::IntoInlineConnection;
90use mz_storage_types::controller::StorageError;
91use mz_storage_types::stats::RelationPartStats;
92use mz_transform::EmptyStatisticsOracle;
93use mz_transform::dataflow::DataflowMetainfo;
94use mz_transform::notice::{OptimizerNoticeApi, OptimizerNoticeKind, RawOptimizerNotice};
95use smallvec::SmallVec;
96use timely::progress::Antichain;
97use timely::progress::Timestamp as TimelyTimestamp;
98use tokio::sync::{oneshot, watch};
99use tracing::{Instrument, Span, info, warn};
100
101use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
102use crate::command::{ExecuteResponse, Response};
103use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
104use crate::coord::{
105 AlterConnectionValidationReady, AlterSinkReadyContext, Coordinator,
106 CreateConnectionValidationReady, DeferredPlanStatement, ExecuteContext, ExplainContext,
107 Message, NetworkPolicyError, PendingRead, PendingReadTxn, PendingTxn, PendingTxnResponse,
108 PlanValidity, StageResult, Staged, StagedContext, TargetCluster, WatchSetResponse,
109 validate_ip_with_policy_rules,
110};
111use crate::error::AdapterError;
112use crate::notice::{AdapterNotice, DroppedInUseIndex};
113use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
114use crate::optimize::{self, Optimize};
115use crate::session::{
116 EndTransactionAction, RequireLinearization, Session, TransactionOps, TransactionStatus,
117 WriteLocks, WriteOp,
118};
119use crate::util::{ClientTransmitter, ResultExt, viewable_variables};
120use crate::{PeekResponseUnary, ReadHolds};
121
122mod cluster;
123mod copy_from;
124mod create_continual_task;
125mod create_index;
126mod create_materialized_view;
127mod create_view;
128mod explain_timestamp;
129mod peek;
130mod secret;
131mod subscribe;
132
133macro_rules! return_if_err {
136 ($expr:expr, $ctx:expr) => {
137 match $expr {
138 Ok(v) => v,
139 Err(e) => return $ctx.retire(Err(e.into())),
140 }
141 };
142}
143
144pub(super) use return_if_err;
145
146struct DropOps {
147 ops: Vec<catalog::Op>,
148 dropped_active_db: bool,
149 dropped_active_cluster: bool,
150 dropped_in_use_indexes: Vec<DroppedInUseIndex>,
151}
152
153struct CreateSourceInner {
155 ops: Vec<catalog::Op>,
156 sources: Vec<(CatalogItemId, Source)>,
157 if_not_exists_ids: BTreeMap<CatalogItemId, QualifiedItemName>,
158}
159
160impl Coordinator {
161 pub(crate) async fn sequence_staged<S>(
166 &mut self,
167 mut ctx: S::Ctx,
168 parent_span: Span,
169 mut stage: S,
170 ) where
171 S: Staged + 'static,
172 S::Ctx: Send + 'static,
173 {
174 return_if_err!(stage.validity().check(self.catalog()), ctx);
175 loop {
176 let mut cancel_enabled = stage.cancel_enabled();
177 if let Some(session) = ctx.session() {
178 if cancel_enabled {
179 if let Some((_prev_tx, prev_rx)) = self
182 .staged_cancellation
183 .insert(session.conn_id().clone(), watch::channel(false))
184 {
185 let was_canceled = *prev_rx.borrow();
186 if was_canceled {
187 ctx.retire(Err(AdapterError::Canceled));
188 return;
189 }
190 }
191 } else {
192 self.staged_cancellation.remove(session.conn_id());
195 }
196 } else {
197 cancel_enabled = false
198 };
199 let next = stage
200 .stage(self, &mut ctx)
201 .instrument(parent_span.clone())
202 .await;
203 let res = return_if_err!(next, ctx);
204 stage = match res {
205 StageResult::Handle(handle) => {
206 let internal_cmd_tx = self.internal_cmd_tx.clone();
207 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, next| {
208 let _ = internal_cmd_tx.send(next.message(ctx, parent_span));
209 });
210 return;
211 }
212 StageResult::HandleRetire(handle) => {
213 self.handle_spawn(ctx, handle, cancel_enabled, move |ctx, resp| {
214 ctx.retire(Ok(resp));
215 });
216 return;
217 }
218 StageResult::Response(resp) => {
219 ctx.retire(Ok(resp));
220 return;
221 }
222 StageResult::Immediate(stage) => *stage,
223 }
224 }
225 }
226
227 fn handle_spawn<C, T, F>(
228 &self,
229 ctx: C,
230 handle: JoinHandle<Result<T, AdapterError>>,
231 cancel_enabled: bool,
232 f: F,
233 ) where
234 C: StagedContext + Send + 'static,
235 T: Send + 'static,
236 F: FnOnce(C, T) + Send + 'static,
237 {
238 let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
239 .session()
240 .and_then(|session| self.staged_cancellation.get(session.conn_id()))
241 {
242 let mut rx = rx.clone();
243 Box::pin(async move {
244 let _ = rx.wait_for(|v| *v).await;
246 ()
247 })
248 } else {
249 Box::pin(future::pending())
250 };
251 spawn(|| "sequence_staged", async move {
252 tokio::select! {
253 res = handle => {
254 let next = match res {
255 Ok(next) => return_if_err!(next, ctx),
256 Err(err) => {
257 tracing::error!("sequence_staged join error {err}");
258 ctx.retire(Err(AdapterError::Internal(
259 "sequence_staged join error".into(),
260 )));
261 return;
262 }
263 };
264 f(ctx, next);
265 }
266 _ = rx, if cancel_enabled => {
267 ctx.retire(Err(AdapterError::Canceled));
268 }
269 }
270 });
271 }
272
273 async fn create_source_inner(
274 &self,
275 session: &Session,
276 plans: Vec<plan::CreateSourcePlanBundle>,
277 ) -> Result<CreateSourceInner, AdapterError> {
278 let mut ops = vec![];
279 let mut sources = vec![];
280
281 let if_not_exists_ids = plans
282 .iter()
283 .filter_map(
284 |plan::CreateSourcePlanBundle {
285 item_id,
286 global_id: _,
287 plan,
288 resolved_ids: _,
289 available_source_references: _,
290 }| {
291 if plan.if_not_exists {
292 Some((*item_id, plan.name.clone()))
293 } else {
294 None
295 }
296 },
297 )
298 .collect::<BTreeMap<_, _>>();
299
300 for plan::CreateSourcePlanBundle {
301 item_id,
302 global_id,
303 mut plan,
304 resolved_ids,
305 available_source_references,
306 } in plans
307 {
308 let name = plan.name.clone();
309
310 match plan.source.data_source {
311 plan::DataSourceDesc::Ingestion(ref ingestion) => {
312 let cluster_id = plan
313 .in_cluster
314 .expect("ingestion plans must specify cluster");
315 match ingestion.desc.connection {
316 GenericSourceConnection::Postgres(_)
317 | GenericSourceConnection::MySql(_)
318 | GenericSourceConnection::SqlServer(_)
319 | GenericSourceConnection::Kafka(_)
320 | GenericSourceConnection::LoadGenerator(_) => {
321 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
322 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
323 .get(self.catalog().system_config().dyncfgs());
324
325 if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
326 {
327 return Err(AdapterError::Unsupported(
328 "sources in clusters with >1 replicas",
329 ));
330 }
331 }
332 }
333 }
334 }
335 plan::DataSourceDesc::Webhook { .. } => {
336 let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
337 if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
338 let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
339 .get(self.catalog().system_config().dyncfgs());
340
341 if !enable_multi_replica_sources {
342 if cluster.replica_ids().len() > 1 {
343 return Err(AdapterError::Unsupported(
344 "webhook sources in clusters with >1 replicas",
345 ));
346 }
347 }
348 }
349 }
350 plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
351 }
352
353 if let mz_sql::plan::DataSourceDesc::Webhook {
355 validate_using: Some(validate),
356 ..
357 } = &mut plan.source.data_source
358 {
359 if let Err(reason) = validate.reduce_expression().await {
360 self.metrics
361 .webhook_validation_reduce_failures
362 .with_label_values(&[reason])
363 .inc();
364 return Err(AdapterError::Internal(format!(
365 "failed to reduce check expression, {reason}"
366 )));
367 }
368 }
369
370 let mut reference_ops = vec![];
373 if let Some(references) = &available_source_references {
374 reference_ops.push(catalog::Op::UpdateSourceReferences {
375 source_id: item_id,
376 references: references.clone().into(),
377 });
378 }
379
380 let source = Source::new(plan, global_id, resolved_ids, None, false);
381 ops.push(catalog::Op::CreateItem {
382 id: item_id,
383 name,
384 item: CatalogItem::Source(source.clone()),
385 owner_id: *session.current_role_id(),
386 });
387 sources.push((item_id, source));
388 ops.extend(reference_ops);
390 }
391
392 Ok(CreateSourceInner {
393 ops,
394 sources,
395 if_not_exists_ids,
396 })
397 }
398
399 pub(crate) fn plan_subsource(
407 &self,
408 session: &Session,
409 params: &mz_sql::plan::Params,
410 subsource_stmt: CreateSubsourceStatement<mz_sql::names::Aug>,
411 item_id: CatalogItemId,
412 global_id: GlobalId,
413 ) -> Result<CreateSourcePlanBundle, AdapterError> {
414 let catalog = self.catalog().for_session(session);
415 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &subsource_stmt);
416
417 let plan = self.plan_statement(
418 session,
419 Statement::CreateSubsource(subsource_stmt),
420 params,
421 &resolved_ids,
422 )?;
423 let plan = match plan {
424 Plan::CreateSource(plan) => plan,
425 _ => unreachable!(),
426 };
427 Ok(CreateSourcePlanBundle {
428 item_id,
429 global_id,
430 plan,
431 resolved_ids,
432 available_source_references: None,
433 })
434 }
435
436 pub(crate) async fn plan_purified_alter_source_add_subsource(
438 &mut self,
439 session: &Session,
440 params: Params,
441 source_name: ResolvedItemName,
442 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
443 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
444 ) -> Result<(Plan, ResolvedIds), AdapterError> {
445 let mut subsource_plans = Vec::with_capacity(subsources.len());
446
447 let conn_catalog = self.catalog().for_system_session();
449 let pcx = plan::PlanContext::zero();
450 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
451
452 let entry = self.catalog().get_entry(source_name.item_id());
453 let source = entry.source().ok_or_else(|| {
454 AdapterError::internal(
455 "plan alter source",
456 format!("expected Source found {entry:?}"),
457 )
458 })?;
459
460 let item_id = entry.id();
461 let ingestion_id = source.global_id();
462 let subsource_stmts = generate_subsource_statements(&scx, source_name, subsources)?;
463
464 let id_ts = self.get_catalog_write_ts().await;
465 let ids = self
466 .catalog_mut()
467 .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
468 .await?;
469 for (subsource_stmt, (item_id, global_id)) in
470 subsource_stmts.into_iter().zip_eq(ids.into_iter())
471 {
472 let s = self.plan_subsource(session, ¶ms, subsource_stmt, item_id, global_id)?;
473 subsource_plans.push(s);
474 }
475
476 let action = mz_sql::plan::AlterSourceAction::AddSubsourceExports {
477 subsources: subsource_plans,
478 options,
479 };
480
481 Ok((
482 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
483 item_id,
484 ingestion_id,
485 action,
486 }),
487 ResolvedIds::empty(),
488 ))
489 }
490
491 pub(crate) fn plan_purified_alter_source_refresh_references(
493 &self,
494 _session: &Session,
495 _params: Params,
496 source_name: ResolvedItemName,
497 available_source_references: plan::SourceReferences,
498 ) -> Result<(Plan, ResolvedIds), AdapterError> {
499 let entry = self.catalog().get_entry(source_name.item_id());
500 let source = entry.source().ok_or_else(|| {
501 AdapterError::internal(
502 "plan alter source",
503 format!("expected Source found {entry:?}"),
504 )
505 })?;
506 let action = mz_sql::plan::AlterSourceAction::RefreshReferences {
507 references: available_source_references,
508 };
509
510 Ok((
511 Plan::AlterSource(mz_sql::plan::AlterSourcePlan {
512 item_id: entry.id(),
513 ingestion_id: source.global_id(),
514 action,
515 }),
516 ResolvedIds::empty(),
517 ))
518 }
519
520 pub(crate) async fn plan_purified_create_source(
524 &mut self,
525 ctx: &ExecuteContext,
526 params: Params,
527 progress_stmt: CreateSubsourceStatement<Aug>,
528 mut source_stmt: mz_sql::ast::CreateSourceStatement<Aug>,
529 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
530 available_source_references: plan::SourceReferences,
531 ) -> Result<(Plan, ResolvedIds), AdapterError> {
532 let mut create_source_plans = Vec::with_capacity(subsources.len() + 2);
533
534 assert_none!(progress_stmt.of_source);
541 let id_ts = self.get_catalog_write_ts().await;
542 let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
543 let progress_plan =
544 self.plan_subsource(ctx.session(), ¶ms, progress_stmt, item_id, global_id)?;
545 let progress_full_name = self
546 .catalog()
547 .resolve_full_name(&progress_plan.plan.name, None);
548 let progress_subsource = ResolvedItemName::Item {
549 id: progress_plan.item_id,
550 qualifiers: progress_plan.plan.name.qualifiers.clone(),
551 full_name: progress_full_name,
552 print_id: true,
553 version: RelationVersionSelector::Latest,
554 };
555
556 create_source_plans.push(progress_plan);
557
558 source_stmt.progress_subsource = Some(DeferredItemName::Named(progress_subsource));
559
560 let catalog = self.catalog().for_session(ctx.session());
561 let resolved_ids = mz_sql::names::visit_dependencies(&catalog, &source_stmt);
562
563 let propagated_with_options: Vec<_> = source_stmt
564 .with_options
565 .iter()
566 .filter_map(|opt| match opt.name {
567 CreateSourceOptionName::TimestampInterval => None,
568 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
569 name: CreateSubsourceOptionName::RetainHistory,
570 value: opt.value.clone(),
571 }),
572 })
573 .collect();
574
575 let source_plan = match self.plan_statement(
577 ctx.session(),
578 Statement::CreateSource(source_stmt),
579 ¶ms,
580 &resolved_ids,
581 )? {
582 Plan::CreateSource(plan) => plan,
583 p => unreachable!("s must be CreateSourcePlan but got {:?}", p),
584 };
585
586 let id_ts = self.get_catalog_write_ts().await;
587 let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
588
589 let source_full_name = self.catalog().resolve_full_name(&source_plan.name, None);
590 let of_source = ResolvedItemName::Item {
591 id: item_id,
592 qualifiers: source_plan.name.qualifiers.clone(),
593 full_name: source_full_name,
594 print_id: true,
595 version: RelationVersionSelector::Latest,
596 };
597
598 let conn_catalog = self.catalog().for_system_session();
600 let pcx = plan::PlanContext::zero();
601 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
602
603 let mut subsource_stmts = generate_subsource_statements(&scx, of_source, subsources)?;
604
605 for subsource_stmt in subsource_stmts.iter_mut() {
606 subsource_stmt
607 .with_options
608 .extend(propagated_with_options.iter().cloned())
609 }
610
611 create_source_plans.push(CreateSourcePlanBundle {
612 item_id,
613 global_id,
614 plan: source_plan,
615 resolved_ids: resolved_ids.clone(),
616 available_source_references: Some(available_source_references),
617 });
618
619 let id_ts = self.get_catalog_write_ts().await;
621 let ids = self
622 .catalog_mut()
623 .allocate_user_ids(u64::cast_from(subsource_stmts.len()), id_ts)
624 .await?;
625 for (stmt, (item_id, global_id)) in subsource_stmts.into_iter().zip_eq(ids.into_iter()) {
626 let plan = self.plan_subsource(ctx.session(), ¶ms, stmt, item_id, global_id)?;
627 create_source_plans.push(plan);
628 }
629
630 Ok((
631 Plan::CreateSources(create_source_plans),
632 ResolvedIds::empty(),
633 ))
634 }
635
636 #[instrument]
637 pub(super) async fn sequence_create_source(
638 &mut self,
639 ctx: &mut ExecuteContext,
640 plans: Vec<plan::CreateSourcePlanBundle>,
641 ) -> Result<ExecuteResponse, AdapterError> {
642 let item_ids: Vec<_> = plans.iter().map(|plan| plan.item_id).collect();
643 let CreateSourceInner {
644 ops,
645 sources,
646 if_not_exists_ids,
647 } = self.create_source_inner(ctx.session(), plans).await?;
648
649 let transact_result = self
650 .catalog_transact_with_ddl_transaction(ctx, ops, |coord, ctx| {
651 Box::pin(async move {
652 let mut collections = Vec::with_capacity(sources.len());
664
665 for (item_id, source) in sources {
666 let source_status_item_id =
667 coord.catalog().resolve_builtin_storage_collection(
668 &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
669 );
670 let source_status_collection_id = Some(
671 coord
672 .catalog()
673 .get_entry(&source_status_item_id)
674 .latest_global_id(),
675 );
676
677 let (data_source, status_collection_id) = match source.data_source {
678 DataSourceDesc::Ingestion {
679 ingestion_desc:
680 mz_sql::plan::Ingestion {
681 desc,
682 progress_subsource,
683 },
684 cluster_id,
685 } => {
686 let desc = desc.into_inline_connection(coord.catalog().state());
687 let progress_subsource = coord
690 .catalog()
691 .get_entry(&progress_subsource)
692 .latest_global_id();
693
694 let ingestion =
695 mz_storage_types::sources::IngestionDescription::new(
696 desc,
697 cluster_id,
698 progress_subsource,
699 );
700
701 (
702 DataSource::Ingestion(ingestion),
703 source_status_collection_id,
704 )
705 }
706 DataSourceDesc::IngestionExport {
707 ingestion_id,
708 external_reference: _,
709 details,
710 data_config,
711 } => {
712 let ingestion_id =
715 coord.catalog().get_entry(&ingestion_id).latest_global_id();
716 (
717 DataSource::IngestionExport {
718 ingestion_id,
719 details,
720 data_config: data_config
721 .into_inline_connection(coord.catalog().state()),
722 },
723 source_status_collection_id,
724 )
725 }
726 DataSourceDesc::Progress => (DataSource::Progress, None),
727 DataSourceDesc::Webhook { .. } => {
728 if let Some(url) =
729 coord.catalog().state().try_get_webhook_url(&item_id)
730 {
731 if let Some(ctx) = ctx.as_ref() {
732 ctx.session()
733 .add_notice(AdapterNotice::WebhookSourceCreated { url })
734 }
735 }
736
737 (DataSource::Webhook, None)
738 }
739 DataSourceDesc::Introspection(_) => {
740 unreachable!(
741 "cannot create sources with introspection data sources"
742 )
743 }
744 };
745
746 collections.push((
747 source.global_id,
748 CollectionDescription::<Timestamp> {
749 desc: source.desc.clone(),
750 data_source,
751 timeline: Some(source.timeline),
752 since: None,
753 status_collection_id,
754 },
755 ));
756 }
757
758 let storage_metadata = coord.catalog.state().storage_metadata();
759
760 coord
761 .controller
762 .storage
763 .create_collections(storage_metadata, None, collections)
764 .await
765 .unwrap_or_terminate("cannot fail to create collections");
766
767 let read_policies = coord.catalog().state().source_compaction_windows(item_ids);
783 for (compaction_window, storage_policies) in read_policies {
784 coord
785 .initialize_storage_read_policies(storage_policies, compaction_window)
786 .await;
787 }
788 })
789 })
790 .await;
791
792 match transact_result {
793 Ok(()) => Ok(ExecuteResponse::CreatedSource),
794 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
795 kind:
796 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(id, _)),
797 })) if if_not_exists_ids.contains_key(&id) => {
798 ctx.session()
799 .add_notice(AdapterNotice::ObjectAlreadyExists {
800 name: if_not_exists_ids[&id].item.clone(),
801 ty: "source",
802 });
803 Ok(ExecuteResponse::CreatedSource)
804 }
805 Err(err) => Err(err),
806 }
807 }
808
809 #[instrument]
810 pub(super) async fn sequence_create_connection(
811 &mut self,
812 mut ctx: ExecuteContext,
813 plan: plan::CreateConnectionPlan,
814 resolved_ids: ResolvedIds,
815 ) {
816 let id_ts = self.get_catalog_write_ts().await;
817 let (connection_id, connection_gid) = match self.catalog_mut().allocate_user_id(id_ts).await
818 {
819 Ok(item_id) => item_id,
820 Err(err) => return ctx.retire(Err(err.into())),
821 };
822
823 match &plan.connection.details {
824 ConnectionDetails::Ssh { key_1, key_2, .. } => {
825 let key_1 = match key_1.as_key_pair() {
826 Some(key_1) => key_1.clone(),
827 None => {
828 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
829 "the PUBLIC KEY 1 option cannot be explicitly specified"
830 ))));
831 }
832 };
833
834 let key_2 = match key_2.as_key_pair() {
835 Some(key_2) => key_2.clone(),
836 None => {
837 return ctx.retire(Err(AdapterError::Unstructured(anyhow!(
838 "the PUBLIC KEY 2 option cannot be explicitly specified"
839 ))));
840 }
841 };
842
843 let key_set = SshKeyPairSet::from_parts(key_1, key_2);
844 let secret = key_set.to_bytes();
845 if let Err(err) = self.secrets_controller.ensure(connection_id, &secret).await {
846 return ctx.retire(Err(err.into()));
847 }
848 }
849 _ => (),
850 };
851
852 if plan.validate {
853 let internal_cmd_tx = self.internal_cmd_tx.clone();
854 let transient_revision = self.catalog().transient_revision();
855 let conn_id = ctx.session().conn_id().clone();
856 let otel_ctx = OpenTelemetryContext::obtain();
857 let role_metadata = ctx.session().role_metadata().clone();
858
859 let connection = plan
860 .connection
861 .details
862 .to_connection()
863 .into_inline_connection(self.catalog().state());
864
865 let current_storage_parameters = self.controller.storage.config().clone();
866 task::spawn(|| format!("validate_connection:{conn_id}"), async move {
867 let result = match connection
868 .validate(connection_id, ¤t_storage_parameters)
869 .await
870 {
871 Ok(()) => Ok(plan),
872 Err(err) => Err(err.into()),
873 };
874
875 let result = internal_cmd_tx.send(Message::CreateConnectionValidationReady(
877 CreateConnectionValidationReady {
878 ctx,
879 result,
880 connection_id,
881 connection_gid,
882 plan_validity: PlanValidity::new(
883 transient_revision,
884 resolved_ids.items().copied().collect(),
885 None,
886 None,
887 role_metadata,
888 ),
889 otel_ctx,
890 resolved_ids: resolved_ids.clone(),
891 },
892 ));
893 if let Err(e) = result {
894 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
895 }
896 });
897 } else {
898 let result = self
899 .sequence_create_connection_stage_finish(
900 &mut ctx,
901 connection_id,
902 connection_gid,
903 plan,
904 resolved_ids,
905 )
906 .await;
907 ctx.retire(result);
908 }
909 }
910
911 #[instrument]
912 pub(crate) async fn sequence_create_connection_stage_finish(
913 &mut self,
914 ctx: &mut ExecuteContext,
915 connection_id: CatalogItemId,
916 connection_gid: GlobalId,
917 plan: plan::CreateConnectionPlan,
918 resolved_ids: ResolvedIds,
919 ) -> Result<ExecuteResponse, AdapterError> {
920 let ops = vec![catalog::Op::CreateItem {
921 id: connection_id,
922 name: plan.name.clone(),
923 item: CatalogItem::Connection(Connection {
924 create_sql: plan.connection.create_sql,
925 global_id: connection_gid,
926 details: plan.connection.details.clone(),
927 resolved_ids,
928 }),
929 owner_id: *ctx.session().current_role_id(),
930 }];
931
932 let transact_result = self
933 .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
934 Box::pin(async move {
935 match plan.connection.details {
936 ConnectionDetails::AwsPrivatelink(ref privatelink) => {
937 let spec = VpcEndpointConfig {
938 aws_service_name: privatelink.service_name.to_owned(),
939 availability_zone_ids: privatelink.availability_zones.to_owned(),
940 };
941 let cloud_resource_controller =
942 match coord.cloud_resource_controller.as_ref().cloned() {
943 Some(controller) => controller,
944 None => {
945 tracing::warn!("AWS PrivateLink connections unsupported");
946 return;
947 }
948 };
949 if let Err(err) = cloud_resource_controller
950 .ensure_vpc_endpoint(connection_id, spec)
951 .await
952 {
953 tracing::warn!(?err, "failed to ensure vpc endpoint!");
954 }
955 }
956 _ => {}
957 }
958 })
959 })
960 .await;
961
962 match transact_result {
963 Ok(_) => Ok(ExecuteResponse::CreatedConnection),
964 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
965 kind:
966 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
967 })) if plan.if_not_exists => {
968 ctx.session()
969 .add_notice(AdapterNotice::ObjectAlreadyExists {
970 name: plan.name.item,
971 ty: "connection",
972 });
973 Ok(ExecuteResponse::CreatedConnection)
974 }
975 Err(err) => Err(err),
976 }
977 }
978
979 #[instrument]
980 pub(super) async fn sequence_create_database(
981 &mut self,
982 session: &mut Session,
983 plan: plan::CreateDatabasePlan,
984 ) -> Result<ExecuteResponse, AdapterError> {
985 let ops = vec![catalog::Op::CreateDatabase {
986 name: plan.name.clone(),
987 owner_id: *session.current_role_id(),
988 }];
989 match self.catalog_transact(Some(session), ops).await {
990 Ok(_) => Ok(ExecuteResponse::CreatedDatabase),
991 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
992 kind:
993 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::DatabaseAlreadyExists(_)),
994 })) if plan.if_not_exists => {
995 session.add_notice(AdapterNotice::DatabaseAlreadyExists { name: plan.name });
996 Ok(ExecuteResponse::CreatedDatabase)
997 }
998 Err(err) => Err(err),
999 }
1000 }
1001
1002 #[instrument]
1003 pub(super) async fn sequence_create_schema(
1004 &mut self,
1005 session: &mut Session,
1006 plan: plan::CreateSchemaPlan,
1007 ) -> Result<ExecuteResponse, AdapterError> {
1008 let op = catalog::Op::CreateSchema {
1009 database_id: plan.database_spec,
1010 schema_name: plan.schema_name.clone(),
1011 owner_id: *session.current_role_id(),
1012 };
1013 match self.catalog_transact(Some(session), vec![op]).await {
1014 Ok(_) => Ok(ExecuteResponse::CreatedSchema),
1015 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1016 kind:
1017 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::SchemaAlreadyExists(_)),
1018 })) if plan.if_not_exists => {
1019 session.add_notice(AdapterNotice::SchemaAlreadyExists {
1020 name: plan.schema_name,
1021 });
1022 Ok(ExecuteResponse::CreatedSchema)
1023 }
1024 Err(err) => Err(err),
1025 }
1026 }
1027
1028 fn validate_role_attributes(&self, attributes: &RoleAttributes) -> Result<(), AdapterError> {
1030 if !ENABLE_PASSWORD_AUTH.get(self.catalog().system_config().dyncfgs()) {
1031 if attributes.superuser.is_some()
1032 || attributes.password.is_some()
1033 || attributes.login.is_some()
1034 {
1035 return Err(AdapterError::UnavailableFeature {
1036 feature: "SUPERUSER, PASSWORD, and LOGIN attributes".to_string(),
1037 docs: Some("https://materialize.com/docs/sql/create-role/#details".to_string()),
1038 });
1039 }
1040 }
1041 Ok(())
1042 }
1043
1044 #[instrument]
1045 pub(super) async fn sequence_create_role(
1046 &mut self,
1047 conn_id: Option<&ConnectionId>,
1048 plan::CreateRolePlan { name, attributes }: plan::CreateRolePlan,
1049 ) -> Result<ExecuteResponse, AdapterError> {
1050 self.validate_role_attributes(&attributes.clone())?;
1051 let op = catalog::Op::CreateRole { name, attributes };
1052 self.catalog_transact_conn(conn_id, vec![op])
1053 .await
1054 .map(|_| ExecuteResponse::CreatedRole)
1055 }
1056
1057 #[instrument]
1058 pub(super) async fn sequence_create_network_policy(
1059 &mut self,
1060 session: &Session,
1061 plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan,
1062 ) -> Result<ExecuteResponse, AdapterError> {
1063 let op = catalog::Op::CreateNetworkPolicy {
1064 rules,
1065 name,
1066 owner_id: *session.current_role_id(),
1067 };
1068 self.catalog_transact_conn(Some(session.conn_id()), vec![op])
1069 .await
1070 .map(|_| ExecuteResponse::CreatedNetworkPolicy)
1071 }
1072
1073 #[instrument]
1074 pub(super) async fn sequence_alter_network_policy(
1075 &mut self,
1076 session: &Session,
1077 plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan,
1078 ) -> Result<ExecuteResponse, AdapterError> {
1079 let current_network_policy_name = self
1081 .owned_catalog()
1082 .system_config()
1083 .default_network_policy_name();
1084 if current_network_policy_name == name {
1086 self.validate_alter_network_policy(session, &rules)?;
1087 }
1088
1089 let op = catalog::Op::AlterNetworkPolicy {
1090 id,
1091 rules,
1092 name,
1093 owner_id: *session.current_role_id(),
1094 };
1095 self.catalog_transact_conn(Some(session.conn_id()), vec![op])
1096 .await
1097 .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy))
1098 }
1099
1100 #[instrument]
1101 pub(super) async fn sequence_create_table(
1102 &mut self,
1103 ctx: &mut ExecuteContext,
1104 plan: plan::CreateTablePlan,
1105 resolved_ids: ResolvedIds,
1106 ) -> Result<ExecuteResponse, AdapterError> {
1107 let plan::CreateTablePlan {
1108 name,
1109 table,
1110 if_not_exists,
1111 } = plan;
1112
1113 let conn_id = if table.temporary {
1114 Some(ctx.session().conn_id())
1115 } else {
1116 None
1117 };
1118 let id_ts = self.get_catalog_write_ts().await;
1119 let (table_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
1120 let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
1121
1122 let data_source = match table.data_source {
1123 plan::TableDataSource::TableWrites { defaults } => {
1124 TableDataSource::TableWrites { defaults }
1125 }
1126 plan::TableDataSource::DataSource {
1127 desc: data_source_plan,
1128 timeline,
1129 } => match data_source_plan {
1130 plan::DataSourceDesc::IngestionExport {
1131 ingestion_id,
1132 external_reference,
1133 details,
1134 data_config,
1135 } => TableDataSource::DataSource {
1136 desc: DataSourceDesc::IngestionExport {
1137 ingestion_id,
1138 external_reference,
1139 details,
1140 data_config,
1141 },
1142 timeline,
1143 },
1144 plan::DataSourceDesc::Webhook {
1145 validate_using,
1146 body_format,
1147 headers,
1148 cluster_id,
1149 } => TableDataSource::DataSource {
1150 desc: DataSourceDesc::Webhook {
1151 validate_using,
1152 body_format,
1153 headers,
1154 cluster_id: cluster_id.expect("Webhook Tables must have cluster_id set"),
1155 },
1156 timeline,
1157 },
1158 o => {
1159 unreachable!("CREATE TABLE data source got {:?}", o)
1160 }
1161 },
1162 };
1163 let table = Table {
1164 create_sql: Some(table.create_sql),
1165 desc: table.desc,
1166 collections,
1167 conn_id: conn_id.cloned(),
1168 resolved_ids,
1169 custom_logical_compaction_window: table.compaction_window,
1170 is_retained_metrics_object: false,
1171 data_source,
1172 };
1173 let ops = vec![catalog::Op::CreateItem {
1174 id: table_id,
1175 name: name.clone(),
1176 item: CatalogItem::Table(table.clone()),
1177 owner_id: *ctx.session().current_role_id(),
1178 }];
1179
1180 let catalog_result = self
1181 .catalog_transact_with_ddl_transaction(ctx, ops, move |coord, ctx| {
1182 Box::pin(async move {
1183 let (collections, register_ts, read_policies) = match table.data_source {
1187 TableDataSource::TableWrites { defaults: _ } => {
1188 let register_ts = coord.get_local_write_ts().await.timestamp;
1190
1191 coord
1198 .catalog
1199 .confirm_leadership()
1200 .await
1201 .unwrap_or_terminate("unable to confirm leadership");
1202
1203 if let Some(id) = ctx.as_ref().and_then(|ctx| ctx.extra().contents()) {
1204 coord.set_statement_execution_timestamp(id, register_ts);
1205 }
1206
1207 let relation_version = RelationVersion::root();
1209 assert_eq!(table.desc.latest_version(), relation_version);
1210 let relation_desc = table
1211 .desc
1212 .at_version(RelationVersionSelector::Specific(relation_version));
1213 let collection_desc =
1215 CollectionDescription::for_table(relation_desc, None);
1216 let collections = vec![(global_id, collection_desc)];
1217
1218 let compaction_window = table
1219 .custom_logical_compaction_window
1220 .unwrap_or(CompactionWindow::Default);
1221 let read_policies =
1222 BTreeMap::from([(compaction_window, btreeset! { table_id })]);
1223
1224 (collections, Some(register_ts), read_policies)
1225 }
1226 TableDataSource::DataSource {
1227 desc: data_source,
1228 timeline,
1229 } => {
1230 match data_source {
1231 DataSourceDesc::IngestionExport {
1232 ingestion_id,
1233 external_reference: _,
1234 details,
1235 data_config,
1236 } => {
1237 let source_status_item_id =
1241 coord.catalog().resolve_builtin_storage_collection(
1242 &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
1243 );
1244 let status_collection_id = Some(
1245 coord
1246 .catalog()
1247 .get_entry(&source_status_item_id)
1248 .latest_global_id(),
1249 );
1250 let ingestion_id =
1253 coord.catalog().get_entry(&ingestion_id).latest_global_id();
1254 let collection_desc = CollectionDescription::<Timestamp> {
1256 desc: table
1257 .desc
1258 .at_version(RelationVersionSelector::Latest),
1259 data_source: DataSource::IngestionExport {
1260 ingestion_id,
1261 details,
1262 data_config: data_config
1263 .into_inline_connection(coord.catalog.state()),
1264 },
1265 since: None,
1266 status_collection_id,
1267 timeline: Some(timeline.clone()),
1268 };
1269
1270 let collections = vec![(global_id, collection_desc)];
1271 let read_policies = coord
1272 .catalog()
1273 .state()
1274 .source_compaction_windows(vec![table_id]);
1275
1276 (collections, None, read_policies)
1277 }
1278 DataSourceDesc::Webhook { .. } => {
1279 if let Some(url) =
1280 coord.catalog().state().try_get_webhook_url(&table_id)
1281 {
1282 if let Some(ctx) = ctx.as_ref() {
1283 ctx.session().add_notice(
1284 AdapterNotice::WebhookSourceCreated { url },
1285 )
1286 }
1287 }
1288
1289 assert_eq!(
1291 table.desc.latest_version(),
1292 RelationVersion::root(),
1293 "found webhook with more than 1 relation version, {:?}",
1294 table.desc
1295 );
1296 let desc = table.desc.latest();
1297
1298 let collection_desc = CollectionDescription {
1299 desc,
1300 data_source: DataSource::Webhook,
1301 since: None,
1302 status_collection_id: None,
1303 timeline: Some(timeline.clone()),
1304 };
1305 let collections = vec![(global_id, collection_desc)];
1306 let read_policies = coord
1307 .catalog()
1308 .state()
1309 .source_compaction_windows(vec![table_id]);
1310
1311 (collections, None, read_policies)
1312 }
1313 _ => unreachable!("CREATE TABLE data source got {:?}", data_source),
1314 }
1315 }
1316 };
1317
1318 let storage_metadata = coord.catalog.state().storage_metadata();
1320 coord
1321 .controller
1322 .storage
1323 .create_collections(storage_metadata, register_ts, collections)
1324 .await
1325 .unwrap_or_terminate("cannot fail to create collections");
1326
1327 if let Some(register_ts) = register_ts {
1329 coord.apply_local_write(register_ts).await;
1330 }
1331
1332 for (compaction_window, storage_policies) in read_policies {
1334 coord
1335 .initialize_storage_read_policies(storage_policies, compaction_window)
1336 .await;
1337 }
1338 })
1339 })
1340 .await;
1341
1342 match catalog_result {
1343 Ok(()) => Ok(ExecuteResponse::CreatedTable),
1344 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1345 kind:
1346 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1347 })) if if_not_exists => {
1348 ctx.session_mut()
1349 .add_notice(AdapterNotice::ObjectAlreadyExists {
1350 name: name.item,
1351 ty: "table",
1352 });
1353 Ok(ExecuteResponse::CreatedTable)
1354 }
1355 Err(err) => Err(err),
1356 }
1357 }
1358
1359 #[instrument]
1360 pub(super) async fn sequence_create_sink(
1361 &mut self,
1362 ctx: ExecuteContext,
1363 plan: plan::CreateSinkPlan,
1364 resolved_ids: ResolvedIds,
1365 ) {
1366 let plan::CreateSinkPlan {
1367 name,
1368 sink,
1369 with_snapshot,
1370 if_not_exists,
1371 in_cluster,
1372 } = plan;
1373
1374 let id_ts = self.get_catalog_write_ts().await;
1376 let (item_id, global_id) =
1377 return_if_err!(self.catalog_mut().allocate_user_id(id_ts).await, ctx);
1378
1379 let catalog_sink = Sink {
1380 create_sql: sink.create_sql,
1381 global_id,
1382 from: sink.from,
1383 connection: sink.connection,
1384 envelope: sink.envelope,
1385 version: sink.version,
1386 with_snapshot,
1387 resolved_ids,
1388 cluster_id: in_cluster,
1389 };
1390
1391 let ops = vec![catalog::Op::CreateItem {
1392 id: item_id,
1393 name: name.clone(),
1394 item: CatalogItem::Sink(catalog_sink.clone()),
1395 owner_id: *ctx.session().current_role_id(),
1396 }];
1397
1398 let from = self.catalog().get_entry_by_global_id(&catalog_sink.from);
1399 if let Err(e) = self
1400 .controller
1401 .storage
1402 .check_exists(sink.from)
1403 .map_err(|e| match e {
1404 StorageError::IdentifierMissing(_) => AdapterError::Unstructured(anyhow!(
1405 "{} is a {}, which cannot be exported as a sink",
1406 from.name().item.clone(),
1407 from.item().typ().to_string(),
1408 )),
1409 e => AdapterError::Storage(e),
1410 })
1411 {
1412 ctx.retire(Err(e));
1413 return;
1414 }
1415
1416 let result = self.catalog_transact(Some(ctx.session()), ops).await;
1417
1418 match result {
1419 Ok(()) => {}
1420 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1421 kind:
1422 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1423 })) if if_not_exists => {
1424 ctx.session()
1425 .add_notice(AdapterNotice::ObjectAlreadyExists {
1426 name: name.item,
1427 ty: "sink",
1428 });
1429 ctx.retire(Ok(ExecuteResponse::CreatedSink));
1430 return;
1431 }
1432 Err(e) => {
1433 ctx.retire(Err(e));
1434 return;
1435 }
1436 };
1437
1438 self.create_storage_export(global_id, &catalog_sink)
1439 .await
1440 .unwrap_or_terminate("cannot fail to create exports");
1441
1442 self.initialize_storage_read_policies([item_id].into(), CompactionWindow::Default)
1443 .await;
1444
1445 ctx.retire(Ok(ExecuteResponse::CreatedSink))
1446 }
1447
1448 pub(super) fn validate_system_column_references(
1471 &self,
1472 uses_ambiguous_columns: bool,
1473 depends_on: &BTreeSet<GlobalId>,
1474 ) -> Result<(), AdapterError> {
1475 if uses_ambiguous_columns
1476 && depends_on
1477 .iter()
1478 .any(|id| id.is_system() && self.catalog().get_entry_by_global_id(id).is_relation())
1479 {
1480 Err(AdapterError::AmbiguousSystemColumnReference)
1481 } else {
1482 Ok(())
1483 }
1484 }
1485
1486 #[instrument]
1487 pub(super) async fn sequence_create_type(
1488 &mut self,
1489 session: &Session,
1490 plan: plan::CreateTypePlan,
1491 resolved_ids: ResolvedIds,
1492 ) -> Result<ExecuteResponse, AdapterError> {
1493 let id_ts = self.get_catalog_write_ts().await;
1494 let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
1495 let typ = Type {
1496 create_sql: Some(plan.typ.create_sql),
1497 global_id,
1498 desc: plan.typ.inner.desc(&self.catalog().for_session(session))?,
1499 details: CatalogTypeDetails {
1500 array_id: None,
1501 typ: plan.typ.inner,
1502 pg_metadata: None,
1503 },
1504 resolved_ids,
1505 };
1506 let op = catalog::Op::CreateItem {
1507 id: item_id,
1508 name: plan.name,
1509 item: CatalogItem::Type(typ),
1510 owner_id: *session.current_role_id(),
1511 };
1512 match self.catalog_transact(Some(session), vec![op]).await {
1513 Ok(()) => Ok(ExecuteResponse::CreatedType),
1514 Err(err) => Err(err),
1515 }
1516 }
1517
1518 #[instrument]
1519 pub(super) async fn sequence_comment_on(
1520 &mut self,
1521 session: &Session,
1522 plan: plan::CommentPlan,
1523 ) -> Result<ExecuteResponse, AdapterError> {
1524 let op = catalog::Op::Comment {
1525 object_id: plan.object_id,
1526 sub_component: plan.sub_component,
1527 comment: plan.comment,
1528 };
1529 self.catalog_transact(Some(session), vec![op]).await?;
1530 Ok(ExecuteResponse::Comment)
1531 }
1532
1533 #[instrument]
1534 pub(super) async fn sequence_drop_objects(
1535 &mut self,
1536 session: &Session,
1537 plan::DropObjectsPlan {
1538 drop_ids,
1539 object_type,
1540 referenced_ids,
1541 }: plan::DropObjectsPlan,
1542 ) -> Result<ExecuteResponse, AdapterError> {
1543 let referenced_ids_hashset = referenced_ids.iter().collect::<HashSet<_>>();
1544 let mut objects = Vec::new();
1545 for obj_id in &drop_ids {
1546 if !referenced_ids_hashset.contains(obj_id) {
1547 let object_info = ErrorMessageObjectDescription::from_id(
1548 obj_id,
1549 &self.catalog().for_session(session),
1550 )
1551 .to_string();
1552 objects.push(object_info);
1553 }
1554 }
1555
1556 if !objects.is_empty() {
1557 session.add_notice(AdapterNotice::CascadeDroppedObject { objects });
1558 }
1559
1560 let DropOps {
1561 ops,
1562 dropped_active_db,
1563 dropped_active_cluster,
1564 dropped_in_use_indexes,
1565 } = self.sequence_drop_common(session, drop_ids)?;
1566
1567 self.catalog_transact(Some(session), ops).await?;
1568
1569 fail::fail_point!("after_sequencer_drop_replica");
1570
1571 if dropped_active_db {
1572 session.add_notice(AdapterNotice::DroppedActiveDatabase {
1573 name: session.vars().database().to_string(),
1574 });
1575 }
1576 if dropped_active_cluster {
1577 session.add_notice(AdapterNotice::DroppedActiveCluster {
1578 name: session.vars().cluster().to_string(),
1579 });
1580 }
1581 for dropped_in_use_index in dropped_in_use_indexes {
1582 session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1583 self.metrics
1584 .optimization_notices
1585 .with_label_values(&["DroppedInUseIndex"])
1586 .inc_by(1);
1587 }
1588 Ok(ExecuteResponse::DroppedObject(object_type))
1589 }
1590
1591 fn validate_dropped_role_ownership(
1592 &self,
1593 session: &Session,
1594 dropped_roles: &BTreeMap<RoleId, &str>,
1595 ) -> Result<(), AdapterError> {
1596 fn privilege_check(
1597 privileges: &PrivilegeMap,
1598 dropped_roles: &BTreeMap<RoleId, &str>,
1599 dependent_objects: &mut BTreeMap<String, Vec<String>>,
1600 object_id: &SystemObjectId,
1601 catalog: &ConnCatalog,
1602 ) {
1603 for privilege in privileges.all_values() {
1604 if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
1605 let grantor_name = catalog.get_role(&privilege.grantor).name();
1606 let object_description =
1607 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1608 dependent_objects
1609 .entry(role_name.to_string())
1610 .or_default()
1611 .push(format!(
1612 "privileges on {object_description} granted by {grantor_name}",
1613 ));
1614 }
1615 if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
1616 let grantee_name = catalog.get_role(&privilege.grantee).name();
1617 let object_description =
1618 ErrorMessageObjectDescription::from_sys_id(object_id, catalog);
1619 dependent_objects
1620 .entry(role_name.to_string())
1621 .or_default()
1622 .push(format!(
1623 "privileges granted on {object_description} to {grantee_name}"
1624 ));
1625 }
1626 }
1627 }
1628
1629 let catalog = self.catalog().for_session(session);
1630 let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
1631 for entry in self.catalog.entries() {
1632 let id = SystemObjectId::Object(entry.id().into());
1633 if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
1634 let object_description = ErrorMessageObjectDescription::from_sys_id(&id, &catalog);
1635 dependent_objects
1636 .entry(role_name.to_string())
1637 .or_default()
1638 .push(format!("owner of {object_description}"));
1639 }
1640 privilege_check(
1641 entry.privileges(),
1642 dropped_roles,
1643 &mut dependent_objects,
1644 &id,
1645 &catalog,
1646 );
1647 }
1648 for database in self.catalog.databases() {
1649 let database_id = SystemObjectId::Object(database.id().into());
1650 if let Some(role_name) = dropped_roles.get(&database.owner_id) {
1651 let object_description =
1652 ErrorMessageObjectDescription::from_sys_id(&database_id, &catalog);
1653 dependent_objects
1654 .entry(role_name.to_string())
1655 .or_default()
1656 .push(format!("owner of {object_description}"));
1657 }
1658 privilege_check(
1659 &database.privileges,
1660 dropped_roles,
1661 &mut dependent_objects,
1662 &database_id,
1663 &catalog,
1664 );
1665 for schema in database.schemas_by_id.values() {
1666 let schema_id = SystemObjectId::Object(
1667 (ResolvedDatabaseSpecifier::Id(database.id()), *schema.id()).into(),
1668 );
1669 if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
1670 let object_description =
1671 ErrorMessageObjectDescription::from_sys_id(&schema_id, &catalog);
1672 dependent_objects
1673 .entry(role_name.to_string())
1674 .or_default()
1675 .push(format!("owner of {object_description}"));
1676 }
1677 privilege_check(
1678 &schema.privileges,
1679 dropped_roles,
1680 &mut dependent_objects,
1681 &schema_id,
1682 &catalog,
1683 );
1684 }
1685 }
1686 for cluster in self.catalog.clusters() {
1687 let cluster_id = SystemObjectId::Object(cluster.id().into());
1688 if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
1689 let object_description =
1690 ErrorMessageObjectDescription::from_sys_id(&cluster_id, &catalog);
1691 dependent_objects
1692 .entry(role_name.to_string())
1693 .or_default()
1694 .push(format!("owner of {object_description}"));
1695 }
1696 privilege_check(
1697 &cluster.privileges,
1698 dropped_roles,
1699 &mut dependent_objects,
1700 &cluster_id,
1701 &catalog,
1702 );
1703 for replica in cluster.replicas() {
1704 if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
1705 let replica_id =
1706 SystemObjectId::Object((replica.cluster_id(), replica.replica_id()).into());
1707 let object_description =
1708 ErrorMessageObjectDescription::from_sys_id(&replica_id, &catalog);
1709 dependent_objects
1710 .entry(role_name.to_string())
1711 .or_default()
1712 .push(format!("owner of {object_description}"));
1713 }
1714 }
1715 }
1716 privilege_check(
1717 self.catalog().system_privileges(),
1718 dropped_roles,
1719 &mut dependent_objects,
1720 &SystemObjectId::System,
1721 &catalog,
1722 );
1723 for (default_privilege_object, default_privilege_acl_items) in
1724 self.catalog.default_privileges()
1725 {
1726 if let Some(role_name) = dropped_roles.get(&default_privilege_object.role_id) {
1727 dependent_objects
1728 .entry(role_name.to_string())
1729 .or_default()
1730 .push(format!(
1731 "default privileges on {}S created by {}",
1732 default_privilege_object.object_type, role_name
1733 ));
1734 }
1735 for default_privilege_acl_item in default_privilege_acl_items {
1736 if let Some(role_name) = dropped_roles.get(&default_privilege_acl_item.grantee) {
1737 dependent_objects
1738 .entry(role_name.to_string())
1739 .or_default()
1740 .push(format!(
1741 "default privileges on {}S granted to {}",
1742 default_privilege_object.object_type, role_name
1743 ));
1744 }
1745 }
1746 }
1747
1748 if !dependent_objects.is_empty() {
1749 Err(AdapterError::DependentObject(dependent_objects))
1750 } else {
1751 Ok(())
1752 }
1753 }
1754
1755 #[instrument]
1756 pub(super) async fn sequence_drop_owned(
1757 &mut self,
1758 session: &Session,
1759 plan: plan::DropOwnedPlan,
1760 ) -> Result<ExecuteResponse, AdapterError> {
1761 for role_id in &plan.role_ids {
1762 self.catalog().ensure_not_reserved_role(role_id)?;
1763 }
1764
1765 let mut privilege_revokes = plan.privilege_revokes;
1766
1767 let session_catalog = self.catalog().for_session(session);
1769 if rbac::is_rbac_enabled_for_session(session_catalog.system_vars(), session)
1770 && !session.is_superuser()
1771 {
1772 let role_membership =
1774 session_catalog.collect_role_membership(session.current_role_id());
1775 let invalid_revokes: BTreeSet<_> = privilege_revokes
1776 .extract_if(.., |(_, privilege)| {
1777 !role_membership.contains(&privilege.grantor)
1778 })
1779 .map(|(object_id, _)| object_id)
1780 .collect();
1781 for invalid_revoke in invalid_revokes {
1782 let object_description =
1783 ErrorMessageObjectDescription::from_sys_id(&invalid_revoke, &session_catalog);
1784 session.add_notice(AdapterNotice::CannotRevoke { object_description });
1785 }
1786 }
1787
1788 let privilege_revoke_ops = privilege_revokes.into_iter().map(|(object_id, privilege)| {
1789 catalog::Op::UpdatePrivilege {
1790 target_id: object_id,
1791 privilege,
1792 variant: UpdatePrivilegeVariant::Revoke,
1793 }
1794 });
1795 let default_privilege_revoke_ops = plan.default_privilege_revokes.into_iter().map(
1796 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
1797 privilege_object,
1798 privilege_acl_item,
1799 variant: UpdatePrivilegeVariant::Revoke,
1800 },
1801 );
1802 let DropOps {
1803 ops: drop_ops,
1804 dropped_active_db,
1805 dropped_active_cluster,
1806 dropped_in_use_indexes,
1807 } = self.sequence_drop_common(session, plan.drop_ids)?;
1808
1809 let ops = privilege_revoke_ops
1810 .chain(default_privilege_revoke_ops)
1811 .chain(drop_ops.into_iter())
1812 .collect();
1813
1814 self.catalog_transact(Some(session), ops).await?;
1815
1816 if dropped_active_db {
1817 session.add_notice(AdapterNotice::DroppedActiveDatabase {
1818 name: session.vars().database().to_string(),
1819 });
1820 }
1821 if dropped_active_cluster {
1822 session.add_notice(AdapterNotice::DroppedActiveCluster {
1823 name: session.vars().cluster().to_string(),
1824 });
1825 }
1826 for dropped_in_use_index in dropped_in_use_indexes {
1827 session.add_notice(AdapterNotice::DroppedInUseIndex(dropped_in_use_index));
1828 }
1829 Ok(ExecuteResponse::DroppedOwned)
1830 }
1831
1832 fn sequence_drop_common(
1833 &self,
1834 session: &Session,
1835 ids: Vec<ObjectId>,
1836 ) -> Result<DropOps, AdapterError> {
1837 let mut dropped_active_db = false;
1838 let mut dropped_active_cluster = false;
1839 let mut dropped_in_use_indexes = Vec::new();
1840 let mut dropped_roles = BTreeMap::new();
1841 let mut dropped_databases = BTreeSet::new();
1842 let mut dropped_schemas = BTreeSet::new();
1843 let mut role_revokes = BTreeSet::new();
1847 let mut default_privilege_revokes = BTreeSet::new();
1850
1851 let mut clusters_to_drop = BTreeSet::new();
1853
1854 let ids_set = ids.iter().collect::<BTreeSet<_>>();
1855 for id in &ids {
1856 match id {
1857 ObjectId::Database(id) => {
1858 let name = self.catalog().get_database(id).name();
1859 if name == session.vars().database() {
1860 dropped_active_db = true;
1861 }
1862 dropped_databases.insert(id);
1863 }
1864 ObjectId::Schema((_, spec)) => {
1865 if let SchemaSpecifier::Id(id) = spec {
1866 dropped_schemas.insert(id);
1867 }
1868 }
1869 ObjectId::Cluster(id) => {
1870 clusters_to_drop.insert(*id);
1871 if let Some(active_id) = self
1872 .catalog()
1873 .active_cluster(session)
1874 .ok()
1875 .map(|cluster| cluster.id())
1876 {
1877 if id == &active_id {
1878 dropped_active_cluster = true;
1879 }
1880 }
1881 }
1882 ObjectId::Role(id) => {
1883 let role = self.catalog().get_role(id);
1884 let name = role.name();
1885 dropped_roles.insert(*id, name);
1886 for (group_id, grantor_id) in &role.membership.map {
1888 role_revokes.insert((*group_id, *id, *grantor_id));
1889 }
1890 }
1891 ObjectId::Item(id) => {
1892 if let Some(index) = self.catalog().get_entry(id).index() {
1893 let humanizer = self.catalog().for_session(session);
1894 let dependants = self
1895 .controller
1896 .compute
1897 .collection_reverse_dependencies(index.cluster_id, index.global_id())
1898 .ok()
1899 .into_iter()
1900 .flatten()
1901 .filter(|dependant_id| {
1902 if dependant_id.is_transient() {
1909 return false;
1910 }
1911 let Some(dependent_id) = humanizer
1913 .try_get_item_by_global_id(dependant_id)
1914 .map(|item| item.id())
1915 else {
1916 return false;
1917 };
1918 !ids_set.contains(&ObjectId::Item(dependent_id))
1921 })
1922 .flat_map(|dependant_id| {
1923 humanizer.humanize_id(dependant_id)
1927 })
1928 .collect_vec();
1929 if !dependants.is_empty() {
1930 dropped_in_use_indexes.push(DroppedInUseIndex {
1931 index_name: humanizer
1932 .humanize_id(index.global_id())
1933 .unwrap_or_else(|| id.to_string()),
1934 dependant_objects: dependants,
1935 });
1936 }
1937 }
1938 }
1939 _ => {}
1940 }
1941 }
1942
1943 for id in &ids {
1944 match id {
1945 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1949 if !clusters_to_drop.contains(cluster_id) {
1950 let cluster = self.catalog.get_cluster(*cluster_id);
1951 if cluster.is_managed() {
1952 let replica =
1953 cluster.replica(*replica_id).expect("Catalog out of sync");
1954 if !replica.config.location.internal() {
1955 coord_bail!("cannot drop replica of managed cluster");
1956 }
1957 }
1958 }
1959 }
1960 _ => {}
1961 }
1962 }
1963
1964 for role_id in dropped_roles.keys() {
1965 self.catalog().ensure_not_reserved_role(role_id)?;
1966 }
1967 self.validate_dropped_role_ownership(session, &dropped_roles)?;
1968 let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
1970 for role in self.catalog().user_roles() {
1971 for dropped_role_id in
1972 dropped_role_ids.intersection(&role.membership.map.keys().collect())
1973 {
1974 role_revokes.insert((
1975 **dropped_role_id,
1976 role.id(),
1977 *role
1978 .membership
1979 .map
1980 .get(*dropped_role_id)
1981 .expect("included in keys above"),
1982 ));
1983 }
1984 }
1985
1986 for (default_privilege_object, default_privilege_acls) in
1987 self.catalog().default_privileges()
1988 {
1989 if matches!(&default_privilege_object.database_id, Some(database_id) if dropped_databases.contains(database_id))
1990 || matches!(&default_privilege_object.schema_id, Some(schema_id) if dropped_schemas.contains(schema_id))
1991 {
1992 for default_privilege_acl in default_privilege_acls {
1993 default_privilege_revokes.insert((
1994 default_privilege_object.clone(),
1995 default_privilege_acl.clone(),
1996 ));
1997 }
1998 }
1999 }
2000
2001 let ops = role_revokes
2002 .into_iter()
2003 .map(|(role_id, member_id, grantor_id)| catalog::Op::RevokeRole {
2004 role_id,
2005 member_id,
2006 grantor_id,
2007 })
2008 .chain(default_privilege_revokes.into_iter().map(
2009 |(privilege_object, privilege_acl_item)| catalog::Op::UpdateDefaultPrivilege {
2010 privilege_object,
2011 privilege_acl_item,
2012 variant: UpdatePrivilegeVariant::Revoke,
2013 },
2014 ))
2015 .chain(iter::once(catalog::Op::DropObjects(
2016 ids.into_iter()
2017 .map(DropObjectInfo::manual_drop_from_object_id)
2018 .collect(),
2019 )))
2020 .collect();
2021
2022 Ok(DropOps {
2023 ops,
2024 dropped_active_db,
2025 dropped_active_cluster,
2026 dropped_in_use_indexes,
2027 })
2028 }
2029
2030 pub(super) fn sequence_explain_schema(
2031 &self,
2032 ExplainSinkSchemaPlan { json_schema, .. }: ExplainSinkSchemaPlan,
2033 ) -> Result<ExecuteResponse, AdapterError> {
2034 let json_value: serde_json::Value = serde_json::from_str(&json_schema).map_err(|e| {
2035 AdapterError::Explain(mz_repr::explain::ExplainError::SerdeJsonError(e))
2036 })?;
2037
2038 let json_string = json_string(&json_value);
2039 let row = Row::pack_slice(&[Datum::String(&json_string)]);
2040 Ok(Self::send_immediate_rows(row))
2041 }
2042
2043 pub(super) fn sequence_show_all_variables(
2044 &self,
2045 session: &Session,
2046 ) -> Result<ExecuteResponse, AdapterError> {
2047 let mut rows = viewable_variables(self.catalog().state(), session)
2048 .map(|v| (v.name(), v.value(), v.description()))
2049 .collect::<Vec<_>>();
2050 rows.sort_by_cached_key(|(name, _, _)| name.to_lowercase());
2051
2052 let rows: Vec<_> = rows
2054 .into_iter()
2055 .map(|(name, val, desc)| {
2056 Row::pack_slice(&[
2057 Datum::String(name),
2058 Datum::String(&val),
2059 Datum::String(desc),
2060 ])
2061 })
2062 .collect();
2063 Ok(Self::send_immediate_rows(rows))
2064 }
2065
2066 pub(super) fn sequence_show_variable(
2067 &self,
2068 session: &Session,
2069 plan: plan::ShowVariablePlan,
2070 ) -> Result<ExecuteResponse, AdapterError> {
2071 if &plan.name == SCHEMA_ALIAS {
2072 let schemas = self.catalog.resolve_search_path(session);
2073 let schema = schemas.first();
2074 return match schema {
2075 Some((database_spec, schema_spec)) => {
2076 let schema_name = &self
2077 .catalog
2078 .get_schema(database_spec, schema_spec, session.conn_id())
2079 .name()
2080 .schema;
2081 let row = Row::pack_slice(&[Datum::String(schema_name)]);
2082 Ok(Self::send_immediate_rows(row))
2083 }
2084 None => {
2085 if session.vars().current_object_missing_warnings() {
2086 session.add_notice(AdapterNotice::NoResolvableSearchPathSchema {
2087 search_path: session
2088 .vars()
2089 .search_path()
2090 .into_iter()
2091 .map(|schema| schema.to_string())
2092 .collect(),
2093 });
2094 }
2095 Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::Null])))
2096 }
2097 };
2098 }
2099
2100 let variable = session
2101 .vars()
2102 .get(self.catalog().system_config(), &plan.name)
2103 .or_else(|_| self.catalog().system_config().get(&plan.name))?;
2104
2105 variable.visible(session.user(), self.catalog().system_config())?;
2108
2109 let row = Row::pack_slice(&[Datum::String(&variable.value())]);
2110 if variable.name() == vars::DATABASE.name()
2111 && matches!(
2112 self.catalog().resolve_database(&variable.value()),
2113 Err(CatalogError::UnknownDatabase(_))
2114 )
2115 && session.vars().current_object_missing_warnings()
2116 {
2117 let name = variable.value();
2118 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
2119 } else if variable.name() == vars::CLUSTER.name()
2120 && matches!(
2121 self.catalog().resolve_cluster(&variable.value()),
2122 Err(CatalogError::UnknownCluster(_))
2123 )
2124 && session.vars().current_object_missing_warnings()
2125 {
2126 let name = variable.value();
2127 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
2128 }
2129 Ok(Self::send_immediate_rows(row))
2130 }
2131
2132 #[instrument]
2133 pub(super) async fn sequence_inspect_shard(
2134 &self,
2135 session: &Session,
2136 plan: plan::InspectShardPlan,
2137 ) -> Result<ExecuteResponse, AdapterError> {
2138 if !session.user().is_internal() {
2141 return Err(AdapterError::Unauthorized(
2142 rbac::UnauthorizedError::MzSystem {
2143 action: "inspect".into(),
2144 },
2145 ));
2146 }
2147 let state = self
2148 .controller
2149 .storage
2150 .inspect_persist_state(plan.id)
2151 .await?;
2152 let jsonb = Jsonb::from_serde_json(state)?;
2153 Ok(Self::send_immediate_rows(jsonb.into_row()))
2154 }
2155
2156 #[instrument]
2157 pub(super) fn sequence_set_variable(
2158 &self,
2159 session: &mut Session,
2160 plan: plan::SetVariablePlan,
2161 ) -> Result<ExecuteResponse, AdapterError> {
2162 let (name, local) = (plan.name, plan.local);
2163 if &name == TRANSACTION_ISOLATION_VAR_NAME {
2164 self.validate_set_isolation_level(session)?;
2165 }
2166 if &name == vars::CLUSTER.name() {
2167 self.validate_set_cluster(session)?;
2168 }
2169
2170 let vars = session.vars_mut();
2171 let values = match plan.value {
2172 plan::VariableValue::Default => None,
2173 plan::VariableValue::Values(values) => Some(values),
2174 };
2175
2176 match values {
2177 Some(values) => {
2178 vars.set(
2179 self.catalog().system_config(),
2180 &name,
2181 VarInput::SqlSet(&values),
2182 local,
2183 )?;
2184
2185 let vars = session.vars();
2186
2187 if name == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
2190 session.add_notice(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
2191 } else if name == vars::CLUSTER.name()
2192 && values[0] == vars::OLD_CATALOG_SERVER_CLUSTER
2193 {
2194 session.add_notice(AdapterNotice::IntrospectionClusterUsage);
2195 }
2196
2197 if name.as_str() == vars::DATABASE.name()
2199 && matches!(
2200 self.catalog().resolve_database(vars.database()),
2201 Err(CatalogError::UnknownDatabase(_))
2202 )
2203 && session.vars().current_object_missing_warnings()
2204 {
2205 let name = vars.database().to_string();
2206 session.add_notice(AdapterNotice::DatabaseDoesNotExist { name });
2207 } else if name.as_str() == vars::CLUSTER.name()
2208 && matches!(
2209 self.catalog().resolve_cluster(vars.cluster()),
2210 Err(CatalogError::UnknownCluster(_))
2211 )
2212 && session.vars().current_object_missing_warnings()
2213 {
2214 let name = vars.cluster().to_string();
2215 session.add_notice(AdapterNotice::ClusterDoesNotExist { name });
2216 } else if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME {
2217 let v = values.into_first().to_lowercase();
2218 if v == IsolationLevel::ReadUncommitted.as_str()
2219 || v == IsolationLevel::ReadCommitted.as_str()
2220 || v == IsolationLevel::RepeatableRead.as_str()
2221 {
2222 session.add_notice(AdapterNotice::UnimplementedIsolationLevel {
2223 isolation_level: v,
2224 });
2225 } else if v == IsolationLevel::StrongSessionSerializable.as_str() {
2226 session.add_notice(AdapterNotice::StrongSessionSerializable);
2227 }
2228 }
2229 }
2230 None => vars.reset(self.catalog().system_config(), &name, local)?,
2231 }
2232
2233 Ok(ExecuteResponse::SetVariable { name, reset: false })
2234 }
2235
2236 pub(super) fn sequence_reset_variable(
2237 &self,
2238 session: &mut Session,
2239 plan: plan::ResetVariablePlan,
2240 ) -> Result<ExecuteResponse, AdapterError> {
2241 let name = plan.name;
2242 if &name == TRANSACTION_ISOLATION_VAR_NAME {
2243 self.validate_set_isolation_level(session)?;
2244 }
2245 if &name == vars::CLUSTER.name() {
2246 self.validate_set_cluster(session)?;
2247 }
2248 session
2249 .vars_mut()
2250 .reset(self.catalog().system_config(), &name, false)?;
2251 Ok(ExecuteResponse::SetVariable { name, reset: true })
2252 }
2253
2254 pub(super) fn sequence_set_transaction(
2255 &self,
2256 session: &mut Session,
2257 plan: plan::SetTransactionPlan,
2258 ) -> Result<ExecuteResponse, AdapterError> {
2259 for mode in plan.modes {
2261 match mode {
2262 TransactionMode::AccessMode(_) => {
2263 return Err(AdapterError::Unsupported("SET TRANSACTION <access-mode>"));
2264 }
2265 TransactionMode::IsolationLevel(isolation_level) => {
2266 self.validate_set_isolation_level(session)?;
2267
2268 session.vars_mut().set(
2269 self.catalog().system_config(),
2270 TRANSACTION_ISOLATION_VAR_NAME,
2271 VarInput::Flat(&isolation_level.to_ast_string_stable()),
2272 plan.local,
2273 )?
2274 }
2275 }
2276 }
2277 Ok(ExecuteResponse::SetVariable {
2278 name: TRANSACTION_ISOLATION_VAR_NAME.to_string(),
2279 reset: false,
2280 })
2281 }
2282
2283 fn validate_set_isolation_level(&self, session: &Session) -> Result<(), AdapterError> {
2284 if session.transaction().contains_ops() {
2285 Err(AdapterError::InvalidSetIsolationLevel)
2286 } else {
2287 Ok(())
2288 }
2289 }
2290
2291 fn validate_set_cluster(&self, session: &Session) -> Result<(), AdapterError> {
2292 if session.transaction().contains_ops() {
2293 Err(AdapterError::InvalidSetCluster)
2294 } else {
2295 Ok(())
2296 }
2297 }
2298
2299 #[instrument]
2300 pub(super) async fn sequence_end_transaction(
2301 &mut self,
2302 mut ctx: ExecuteContext,
2303 mut action: EndTransactionAction,
2304 ) {
2305 if let (EndTransactionAction::Commit, TransactionStatus::Failed(_)) =
2307 (&action, ctx.session().transaction())
2308 {
2309 action = EndTransactionAction::Rollback;
2310 }
2311 let response = match action {
2312 EndTransactionAction::Commit => Ok(PendingTxnResponse::Committed {
2313 params: BTreeMap::new(),
2314 }),
2315 EndTransactionAction::Rollback => Ok(PendingTxnResponse::Rolledback {
2316 params: BTreeMap::new(),
2317 }),
2318 };
2319
2320 let result = self.sequence_end_transaction_inner(&mut ctx, action).await;
2321
2322 let (response, action) = match result {
2323 Ok((Some(TransactionOps::Writes(writes)), _)) if writes.is_empty() => {
2324 (response, action)
2325 }
2326 Ok((Some(TransactionOps::Writes(writes)), write_lock_guards)) => {
2327 let validated_locks = match write_lock_guards {
2331 None => None,
2332 Some(locks) => match locks.validate(writes.iter().map(|op| op.id)) {
2333 Ok(locks) => Some(locks),
2334 Err(missing) => {
2335 tracing::error!(?missing, "programming error, missing write locks");
2336 return ctx.retire(Err(AdapterError::WrongSetOfLocks));
2337 }
2338 },
2339 };
2340
2341 let mut collected_writes: BTreeMap<CatalogItemId, SmallVec<_>> = BTreeMap::new();
2342 for WriteOp { id, rows } in writes {
2343 let total_rows = collected_writes.entry(id).or_default();
2344 total_rows.push(rows);
2345 }
2346
2347 self.submit_write(PendingWriteTxn::User {
2348 span: Span::current(),
2349 writes: collected_writes,
2350 write_locks: validated_locks,
2351 pending_txn: PendingTxn {
2352 ctx,
2353 response,
2354 action,
2355 },
2356 });
2357 return;
2358 }
2359 Ok((
2360 Some(TransactionOps::Peeks {
2361 determination,
2362 requires_linearization: RequireLinearization::Required,
2363 ..
2364 }),
2365 _,
2366 )) if ctx.session().vars().transaction_isolation()
2367 == &IsolationLevel::StrictSerializable =>
2368 {
2369 let conn_id = ctx.session().conn_id().clone();
2370 let pending_read_txn = PendingReadTxn {
2371 txn: PendingRead::Read {
2372 txn: PendingTxn {
2373 ctx,
2374 response,
2375 action,
2376 },
2377 },
2378 timestamp_context: determination.timestamp_context,
2379 created: Instant::now(),
2380 num_requeues: 0,
2381 otel_ctx: OpenTelemetryContext::obtain(),
2382 };
2383 self.strict_serializable_reads_tx
2384 .send((conn_id, pending_read_txn))
2385 .expect("sending to strict_serializable_reads_tx cannot fail");
2386 return;
2387 }
2388 Ok((
2389 Some(TransactionOps::Peeks {
2390 determination,
2391 requires_linearization: RequireLinearization::Required,
2392 ..
2393 }),
2394 _,
2395 )) if ctx.session().vars().transaction_isolation()
2396 == &IsolationLevel::StrongSessionSerializable =>
2397 {
2398 if let Some((timeline, ts)) = determination.timestamp_context.timeline_timestamp() {
2399 ctx.session_mut()
2400 .ensure_timestamp_oracle(timeline.clone())
2401 .apply_write(*ts);
2402 }
2403 (response, action)
2404 }
2405 Ok((Some(TransactionOps::SingleStatement { stmt, params }), _)) => {
2406 self.internal_cmd_tx
2407 .send(Message::ExecuteSingleStatementTransaction {
2408 ctx,
2409 otel_ctx: OpenTelemetryContext::obtain(),
2410 stmt,
2411 params,
2412 })
2413 .expect("must send");
2414 return;
2415 }
2416 Ok((_, _)) => (response, action),
2417 Err(err) => (Err(err), EndTransactionAction::Rollback),
2418 };
2419 let changed = ctx.session_mut().vars_mut().end_transaction(action);
2420 let response = response.map(|mut r| {
2422 r.extend_params(changed);
2423 ExecuteResponse::from(r)
2424 });
2425
2426 ctx.retire(response);
2427 }
2428
2429 #[instrument]
2430 async fn sequence_end_transaction_inner(
2431 &mut self,
2432 ctx: &mut ExecuteContext,
2433 action: EndTransactionAction,
2434 ) -> Result<(Option<TransactionOps<Timestamp>>, Option<WriteLocks>), AdapterError> {
2435 let txn = self.clear_transaction(ctx.session_mut()).await;
2436
2437 if let EndTransactionAction::Commit = action {
2438 if let (Some(mut ops), write_lock_guards) = txn.into_ops_and_lock_guard() {
2439 match &mut ops {
2440 TransactionOps::Writes(writes) => {
2441 for WriteOp { id, .. } in &mut writes.iter() {
2442 let _ = self.catalog().try_get_entry(id).ok_or_else(|| {
2444 AdapterError::Catalog(mz_catalog::memory::error::Error {
2445 kind: mz_catalog::memory::error::ErrorKind::Sql(
2446 CatalogError::UnknownItem(id.to_string()),
2447 ),
2448 })
2449 })?;
2450 }
2451
2452 writes.retain(|WriteOp { rows, .. }| !rows.is_empty());
2454 }
2455 TransactionOps::DDL {
2456 ops,
2457 state: _,
2458 side_effects,
2459 revision,
2460 } => {
2461 if *revision != self.catalog().transient_revision() {
2463 return Err(AdapterError::DDLTransactionRace);
2464 }
2465 let ops = std::mem::take(ops);
2467 let side_effects = std::mem::take(side_effects);
2468 self.catalog_transact_with_side_effects(
2469 Some(ctx),
2470 ops,
2471 move |a, mut ctx| {
2472 Box::pin(async move {
2473 for side_effect in side_effects {
2474 side_effect(a, ctx.as_mut().map(|ctx| &mut **ctx)).await;
2475 }
2476 })
2477 },
2478 )
2479 .await?;
2480 }
2481 _ => (),
2482 }
2483 return Ok((Some(ops), write_lock_guards));
2484 }
2485 }
2486
2487 Ok((None, None))
2488 }
2489
2490 pub(super) async fn sequence_side_effecting_func(
2491 &mut self,
2492 ctx: ExecuteContext,
2493 plan: SideEffectingFunc,
2494 ) {
2495 match plan {
2496 SideEffectingFunc::PgCancelBackend { connection_id } => {
2497 if ctx.session().conn_id().unhandled() == connection_id {
2498 ctx.retire(Err(AdapterError::Canceled));
2502 return;
2503 }
2504
2505 let res = if let Some((id_handle, _conn_meta)) =
2506 self.active_conns.get_key_value(&connection_id)
2507 {
2508 self.handle_privileged_cancel(id_handle.clone()).await;
2510 Datum::True
2511 } else {
2512 Datum::False
2513 };
2514 ctx.retire(Ok(Self::send_immediate_rows(Row::pack_slice(&[res]))));
2515 }
2516 }
2517 }
2518
2519 pub(super) async fn determine_real_time_recent_timestamp(
2522 &self,
2523 session: &Session,
2524 source_ids: impl Iterator<Item = CatalogItemId>,
2525 ) -> Result<Option<BoxFuture<'static, Result<Timestamp, StorageError<Timestamp>>>>, AdapterError>
2526 {
2527 let vars = session.vars();
2528
2529 let r = if vars.real_time_recency()
2534 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
2535 && !session.contains_read_timestamp()
2536 {
2537 let mut to_visit = VecDeque::from_iter(source_ids.filter(CatalogItemId::is_user));
2543 if to_visit.is_empty() {
2546 return Ok(None);
2547 }
2548
2549 let mut timestamp_objects = BTreeSet::new();
2550
2551 while let Some(id) = to_visit.pop_front() {
2552 timestamp_objects.insert(id);
2553 to_visit.extend(
2554 self.catalog()
2555 .get_entry(&id)
2556 .uses()
2557 .into_iter()
2558 .filter(|id| !timestamp_objects.contains(id) && id.is_user()),
2559 );
2560 }
2561 let timestamp_objects = timestamp_objects
2562 .into_iter()
2563 .flat_map(|item_id| self.catalog().get_entry(&item_id).global_ids())
2564 .collect();
2565
2566 let r = self
2567 .controller
2568 .determine_real_time_recent_timestamp(
2569 timestamp_objects,
2570 *vars.real_time_recency_timeout(),
2571 )
2572 .await?;
2573
2574 Some(r)
2575 } else {
2576 None
2577 };
2578
2579 Ok(r)
2580 }
2581
2582 #[instrument]
2583 pub(super) async fn sequence_explain_plan(
2584 &mut self,
2585 ctx: ExecuteContext,
2586 plan: plan::ExplainPlanPlan,
2587 target_cluster: TargetCluster,
2588 ) {
2589 match &plan.explainee {
2590 plan::Explainee::Statement(stmt) => match stmt {
2591 plan::ExplaineeStatement::CreateView { .. } => {
2592 self.explain_create_view(ctx, plan).await;
2593 }
2594 plan::ExplaineeStatement::CreateMaterializedView { .. } => {
2595 self.explain_create_materialized_view(ctx, plan).await;
2596 }
2597 plan::ExplaineeStatement::CreateIndex { .. } => {
2598 self.explain_create_index(ctx, plan).await;
2599 }
2600 plan::ExplaineeStatement::Select { .. } => {
2601 self.explain_peek(ctx, plan, target_cluster).await;
2602 }
2603 },
2604 plan::Explainee::View(_) => {
2605 let result = self.explain_view(&ctx, plan);
2606 ctx.retire(result);
2607 }
2608 plan::Explainee::MaterializedView(_) => {
2609 let result = self.explain_materialized_view(&ctx, plan);
2610 ctx.retire(result);
2611 }
2612 plan::Explainee::Index(_) => {
2613 let result = self.explain_index(&ctx, plan);
2614 ctx.retire(result);
2615 }
2616 plan::Explainee::ReplanView(_) => {
2617 self.explain_replan_view(ctx, plan).await;
2618 }
2619 plan::Explainee::ReplanMaterializedView(_) => {
2620 self.explain_replan_materialized_view(ctx, plan).await;
2621 }
2622 plan::Explainee::ReplanIndex(_) => {
2623 self.explain_replan_index(ctx, plan).await;
2624 }
2625 };
2626 }
2627
2628 pub(super) async fn sequence_explain_pushdown(
2629 &mut self,
2630 ctx: ExecuteContext,
2631 plan: plan::ExplainPushdownPlan,
2632 target_cluster: TargetCluster,
2633 ) {
2634 match plan.explainee {
2635 Explainee::Statement(ExplaineeStatement::Select {
2636 broken: false,
2637 plan,
2638 desc: _,
2639 }) => {
2640 let stage = return_if_err!(
2641 self.peek_validate(
2642 ctx.session(),
2643 plan,
2644 target_cluster,
2645 None,
2646 ExplainContext::Pushdown,
2647 Some(ctx.session().vars().max_query_result_size()),
2648 ),
2649 ctx
2650 );
2651 self.sequence_staged(ctx, Span::current(), stage).await;
2652 }
2653 Explainee::MaterializedView(item_id) => {
2654 self.explain_pushdown_materialized_view(ctx, item_id).await;
2655 }
2656 _ => {
2657 ctx.retire(Err(AdapterError::Unsupported(
2658 "EXPLAIN FILTER PUSHDOWN queries for this explainee type",
2659 )));
2660 }
2661 };
2662 }
2663
2664 async fn render_explain_pushdown(
2665 &self,
2666 ctx: ExecuteContext,
2667 as_of: Antichain<Timestamp>,
2668 mz_now: ResultSpec<'static>,
2669 read_holds: Option<ReadHolds<Timestamp>>,
2670 imports: impl IntoIterator<Item = (GlobalId, MapFilterProject)> + 'static,
2671 ) {
2672 let fut = self
2673 .render_explain_pushdown_prepare(ctx.session(), as_of, mz_now, imports)
2674 .await;
2675 task::spawn(|| "render explain pushdown", async move {
2676 let _read_holds = read_holds;
2678 let res = fut.await;
2679 ctx.retire(res);
2680 });
2681 }
2682
2683 async fn render_explain_pushdown_prepare<
2684 I: IntoIterator<Item = (GlobalId, MapFilterProject)>,
2685 >(
2686 &self,
2687 session: &Session,
2688 as_of: Antichain<Timestamp>,
2689 mz_now: ResultSpec<'static>,
2690 imports: I,
2691 ) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
2692 let explain_timeout = *session.vars().statement_timeout();
2693 let mut futures = FuturesOrdered::new();
2694 for (id, mfp) in imports {
2695 let catalog_entry = self.catalog.get_entry_by_global_id(&id);
2696 let full_name = self
2697 .catalog
2698 .for_session(session)
2699 .resolve_full_name(&catalog_entry.name);
2700 let name = format!("{}", full_name);
2701 let relation_desc = catalog_entry
2702 .desc_opt()
2703 .expect("source should have a proper desc")
2704 .into_owned();
2705 let stats_future = self
2706 .controller
2707 .storage_collections
2708 .snapshot_parts_stats(id, as_of.clone())
2709 .await;
2710
2711 let mz_now = mz_now.clone();
2712 futures.push_back(async move {
2715 let snapshot_stats = match stats_future.await {
2716 Ok(stats) => stats,
2717 Err(e) => return Err(e),
2718 };
2719 let mut total_bytes = 0;
2720 let mut total_parts = 0;
2721 let mut selected_bytes = 0;
2722 let mut selected_parts = 0;
2723 for SnapshotPartStats {
2724 encoded_size_bytes: bytes,
2725 stats,
2726 } in &snapshot_stats.parts
2727 {
2728 let bytes = u64::cast_from(*bytes);
2729 total_bytes += bytes;
2730 total_parts += 1u64;
2731 let selected = match stats {
2732 None => true,
2733 Some(stats) => {
2734 let stats = stats.decode();
2735 let stats = RelationPartStats::new(
2736 name.as_str(),
2737 &snapshot_stats.metrics.pushdown.part_stats,
2738 &relation_desc,
2739 &stats,
2740 );
2741 stats.may_match_mfp(mz_now.clone(), &mfp)
2742 }
2743 };
2744
2745 if selected {
2746 selected_bytes += bytes;
2747 selected_parts += 1u64;
2748 }
2749 }
2750 Ok(Row::pack_slice(&[
2751 name.as_str().into(),
2752 total_bytes.into(),
2753 selected_bytes.into(),
2754 total_parts.into(),
2755 selected_parts.into(),
2756 ]))
2757 });
2758 }
2759
2760 let fut = async move {
2761 match tokio::time::timeout(
2762 explain_timeout,
2763 futures::TryStreamExt::try_collect::<Vec<_>>(futures),
2764 )
2765 .await
2766 {
2767 Ok(Ok(rows)) => Ok(ExecuteResponse::SendingRowsImmediate {
2768 rows: Box::new(rows.into_row_iter()),
2769 }),
2770 Ok(Err(err)) => Err(err.into()),
2771 Err(_) => Err(AdapterError::StatementTimeout),
2772 }
2773 };
2774 fut
2775 }
2776
2777 #[instrument]
2778 pub(super) async fn sequence_insert(
2779 &mut self,
2780 mut ctx: ExecuteContext,
2781 plan: plan::InsertPlan,
2782 ) {
2783 if !ctx.session_mut().transaction().allows_writes() {
2791 ctx.retire(Err(AdapterError::ReadOnlyTransaction));
2792 return;
2793 }
2794
2795 let optimized_mir = if let Some(..) = &plan.values.as_const() {
2809 let expr = return_if_err!(
2812 plan.values
2813 .clone()
2814 .lower(self.catalog().system_config(), None),
2815 ctx
2816 );
2817 OptimizedMirRelationExpr(expr)
2818 } else {
2819 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
2821
2822 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
2824
2825 return_if_err!(optimizer.optimize(plan.values.clone()), ctx)
2827 };
2828
2829 match optimized_mir.into_inner() {
2830 selection if selection.as_const().is_some() && plan.returning.is_empty() => {
2831 let catalog = self.owned_catalog();
2832 mz_ore::task::spawn(|| "coord::sequence_inner", async move {
2833 let result =
2834 Self::insert_constant(&catalog, ctx.session_mut(), plan.id, selection);
2835 ctx.retire(result);
2836 });
2837 }
2838 _ => {
2840 let desc_arity = match self.catalog().try_get_entry(&plan.id) {
2841 Some(table) => {
2842 let fullname = self
2843 .catalog()
2844 .resolve_full_name(table.name(), Some(ctx.session().conn_id()));
2845 table
2847 .desc_latest(&fullname)
2848 .expect("desc called on table")
2849 .arity()
2850 }
2851 None => {
2852 ctx.retire(Err(AdapterError::Catalog(
2853 mz_catalog::memory::error::Error {
2854 kind: mz_catalog::memory::error::ErrorKind::Sql(
2855 CatalogError::UnknownItem(plan.id.to_string()),
2856 ),
2857 },
2858 )));
2859 return;
2860 }
2861 };
2862
2863 let finishing = RowSetFinishing {
2864 order_by: vec![],
2865 limit: None,
2866 offset: 0,
2867 project: (0..desc_arity).collect(),
2868 };
2869
2870 let read_then_write_plan = plan::ReadThenWritePlan {
2871 id: plan.id,
2872 selection: plan.values,
2873 finishing,
2874 assignments: BTreeMap::new(),
2875 kind: MutationKind::Insert,
2876 returning: plan.returning,
2877 };
2878
2879 self.sequence_read_then_write(ctx, read_then_write_plan)
2880 .await;
2881 }
2882 }
2883 }
2884
2885 #[instrument]
2890 pub(super) async fn sequence_read_then_write(
2891 &mut self,
2892 mut ctx: ExecuteContext,
2893 plan: plan::ReadThenWritePlan,
2894 ) {
2895 let mut source_ids: BTreeSet<_> = plan
2896 .selection
2897 .depends_on()
2898 .into_iter()
2899 .map(|gid| self.catalog().resolve_item_id(&gid))
2900 .collect();
2901 source_ids.insert(plan.id);
2902
2903 if ctx.session().transaction().write_locks().is_none() {
2905 let mut write_locks = WriteLocks::builder(source_ids.iter().copied());
2907
2908 for id in &source_ids {
2910 if let Some(lock) = self.try_grant_object_write_lock(*id) {
2911 write_locks.insert_lock(*id, lock);
2912 }
2913 }
2914
2915 let write_locks = match write_locks.all_or_nothing(ctx.session().conn_id()) {
2917 Ok(locks) => locks,
2918 Err(missing) => {
2919 let role_metadata = ctx.session().role_metadata().clone();
2921 let acquire_future = self.grant_object_write_lock(missing).map(Option::Some);
2922 let plan = DeferredPlan {
2923 ctx,
2924 plan: Plan::ReadThenWrite(plan),
2925 validity: PlanValidity::new(
2926 self.catalog.transient_revision(),
2927 source_ids.clone(),
2928 None,
2929 None,
2930 role_metadata,
2931 ),
2932 requires_locks: source_ids,
2933 };
2934 return self.defer_op(acquire_future, DeferredOp::Plan(plan));
2935 }
2936 };
2937
2938 ctx.session_mut()
2939 .try_grant_write_locks(write_locks)
2940 .expect("session has already been granted write locks");
2941 }
2942
2943 let plan::ReadThenWritePlan {
2944 id,
2945 kind,
2946 selection,
2947 mut assignments,
2948 finishing,
2949 returning,
2950 } = plan;
2951
2952 let desc = match self.catalog().try_get_entry(&id) {
2954 Some(table) => {
2955 let full_name = self
2956 .catalog()
2957 .resolve_full_name(table.name(), Some(ctx.session().conn_id()));
2958 table
2960 .desc_latest(&full_name)
2961 .expect("desc called on table")
2962 .into_owned()
2963 }
2964 None => {
2965 ctx.retire(Err(AdapterError::Catalog(
2966 mz_catalog::memory::error::Error {
2967 kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
2968 id.to_string(),
2969 )),
2970 },
2971 )));
2972 return;
2973 }
2974 };
2975
2976 let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
2978 || assignments.values().any(|e| e.contains_temporal())
2979 || returning.iter().any(|e| e.contains_temporal());
2980 if contains_temporal {
2981 ctx.retire(Err(AdapterError::Unsupported(
2982 "calls to mz_now in write statements",
2983 )));
2984 return;
2985 }
2986
2987 fn validate_read_dependencies(
2995 catalog: &Catalog,
2996 id: &CatalogItemId,
2997 ) -> Result<(), AdapterError> {
2998 use CatalogItemType::*;
2999 use mz_catalog::memory::objects;
3000 let mut ids_to_check = Vec::new();
3001 let valid = match catalog.try_get_entry(id) {
3002 Some(entry) => {
3003 if let CatalogItem::View(objects::View { optimized_expr, .. })
3004 | CatalogItem::MaterializedView(objects::MaterializedView {
3005 optimized_expr,
3006 ..
3007 }) = entry.item()
3008 {
3009 if optimized_expr.contains_temporal() {
3010 return Err(AdapterError::Unsupported(
3011 "calls to mz_now in write statements",
3012 ));
3013 }
3014 }
3015 match entry.item().typ() {
3016 typ @ (Func | View | MaterializedView | ContinualTask) => {
3017 ids_to_check.extend(entry.uses());
3018 let valid_id = id.is_user() || matches!(typ, Func);
3019 valid_id
3020 }
3021 Source | Secret | Connection => false,
3022 Sink | Index => unreachable!(),
3024 Table => {
3025 if !id.is_user() {
3026 false
3028 } else {
3029 entry.source_export_details().is_none()
3031 }
3032 }
3033 Type => true,
3034 }
3035 }
3036 None => false,
3037 };
3038 if !valid {
3039 return Err(AdapterError::InvalidTableMutationSelection);
3040 }
3041 for id in ids_to_check {
3042 validate_read_dependencies(catalog, &id)?;
3043 }
3044 Ok(())
3045 }
3046
3047 for gid in selection.depends_on() {
3048 let item_id = self.catalog().resolve_item_id(&gid);
3049 if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) {
3050 ctx.retire(Err(err));
3051 return;
3052 }
3053 }
3054
3055 let (peek_tx, peek_rx) = oneshot::channel();
3056 let peek_client_tx = ClientTransmitter::new(peek_tx, self.internal_cmd_tx.clone());
3057 let (tx, _, session, extra) = ctx.into_parts();
3058 let peek_ctx = ExecuteContext::from_parts(
3070 peek_client_tx,
3071 self.internal_cmd_tx.clone(),
3072 session,
3073 Default::default(),
3074 );
3075
3076 self.sequence_peek(
3077 peek_ctx,
3078 plan::SelectPlan {
3079 select: None,
3080 source: selection,
3081 when: QueryWhen::FreshestTableWrite,
3082 finishing,
3083 copy_to: None,
3084 },
3085 TargetCluster::Active,
3086 None,
3087 )
3088 .await;
3089
3090 let internal_cmd_tx = self.internal_cmd_tx.clone();
3091 let strict_serializable_reads_tx = self.strict_serializable_reads_tx.clone();
3092 let catalog = self.owned_catalog();
3093 let max_result_size = self.catalog().system_config().max_result_size();
3094
3095 task::spawn(|| format!("sequence_read_then_write:{id}"), async move {
3096 let (peek_response, session) = match peek_rx.await {
3097 Ok(Response {
3098 result: Ok(resp),
3099 session,
3100 otel_ctx,
3101 }) => {
3102 otel_ctx.attach_as_parent();
3103 (resp, session)
3104 }
3105 Ok(Response {
3106 result: Err(e),
3107 session,
3108 otel_ctx,
3109 }) => {
3110 let ctx =
3111 ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
3112 otel_ctx.attach_as_parent();
3113 ctx.retire(Err(e));
3114 return;
3115 }
3116 Err(e) => return warn!("internal_cmd_rx dropped before we could send: {:?}", e),
3118 };
3119 let mut ctx = ExecuteContext::from_parts(tx, internal_cmd_tx.clone(), session, extra);
3120 let mut timeout_dur = *ctx.session().vars().statement_timeout();
3121
3122 if timeout_dur == Duration::ZERO {
3124 timeout_dur = Duration::MAX;
3125 }
3126
3127 let style = ExprPrepStyle::OneShot {
3128 logical_time: EvalTime::NotAvailable,
3129 session: ctx.session(),
3130 catalog_state: catalog.state(),
3131 };
3132 for expr in assignments.values_mut() {
3133 return_if_err!(prep_scalar_expr(expr, style.clone()), ctx);
3134 }
3135
3136 let make_diffs =
3137 move |mut rows: Box<dyn RowIterator>| -> Result<(Vec<(Row, Diff)>, u64), AdapterError> {
3138 let arena = RowArena::new();
3139 let mut diffs = Vec::new();
3140 let mut datum_vec = mz_repr::DatumVec::new();
3141
3142 while let Some(row) = rows.next() {
3143 if !assignments.is_empty() {
3144 assert!(
3145 matches!(kind, MutationKind::Update),
3146 "only updates support assignments"
3147 );
3148 let mut datums = datum_vec.borrow_with(row);
3149 let mut updates = vec![];
3150 for (idx, expr) in &assignments {
3151 let updated = match expr.eval(&datums, &arena) {
3152 Ok(updated) => updated,
3153 Err(e) => return Err(AdapterError::Unstructured(anyhow!(e))),
3154 };
3155 updates.push((*idx, updated));
3156 }
3157 for (idx, new_value) in updates {
3158 datums[idx] = new_value;
3159 }
3160 let updated = Row::pack_slice(&datums);
3161 diffs.push((updated, Diff::ONE));
3162 }
3163 match kind {
3164 MutationKind::Update | MutationKind::Delete => {
3168 diffs.push((row.to_owned(), Diff::MINUS_ONE))
3169 }
3170 MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
3171 }
3172 }
3173
3174 let mut byte_size: u64 = 0;
3177 for (row, diff) in &diffs {
3178 byte_size = byte_size.saturating_add(u64::cast_from(row.byte_len()));
3179 if diff.is_positive() {
3180 for (idx, datum) in row.iter().enumerate() {
3181 desc.constraints_met(idx, &datum)?;
3182 }
3183 }
3184 }
3185 Ok((diffs, byte_size))
3186 };
3187
3188 let diffs = match peek_response {
3189 ExecuteResponse::SendingRowsStreaming {
3190 rows: mut rows_stream,
3191 ..
3192 } => {
3193 let mut byte_size: u64 = 0;
3194 let mut diffs = Vec::new();
3195 let result = loop {
3196 match tokio::time::timeout(timeout_dur, rows_stream.next()).await {
3197 Ok(Some(res)) => match res {
3198 PeekResponseUnary::Rows(new_rows) => {
3199 match make_diffs(new_rows) {
3200 Ok((mut new_diffs, new_byte_size)) => {
3201 byte_size = byte_size.saturating_add(new_byte_size);
3202 if byte_size > max_result_size {
3203 break Err(AdapterError::ResultSize(format!(
3204 "result exceeds max size of {max_result_size}"
3205 )));
3206 }
3207 diffs.append(&mut new_diffs)
3208 }
3209 Err(e) => break Err(e),
3210 };
3211 }
3212 PeekResponseUnary::Canceled => break Err(AdapterError::Canceled),
3213 PeekResponseUnary::Error(e) => {
3214 break Err(AdapterError::Unstructured(anyhow!(e)));
3215 }
3216 },
3217 Ok(None) => break Ok(diffs),
3218 Err(_) => {
3219 let result = internal_cmd_tx.send(Message::CancelPendingPeeks {
3224 conn_id: ctx.session().conn_id().clone(),
3225 });
3226 if let Err(e) = result {
3227 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3228 }
3229 break Err(AdapterError::StatementTimeout);
3230 }
3231 }
3232 };
3233
3234 result
3235 }
3236 ExecuteResponse::SendingRowsImmediate { rows } => {
3237 make_diffs(rows).map(|(diffs, _byte_size)| diffs)
3238 }
3239 resp => Err(AdapterError::Unstructured(anyhow!(
3240 "unexpected peek response: {resp:?}"
3241 ))),
3242 };
3243
3244 let mut returning_rows = Vec::new();
3245 let mut diff_err: Option<AdapterError> = None;
3246 if let (false, Ok(diffs)) = (returning.is_empty(), &diffs) {
3247 let arena = RowArena::new();
3248 for (row, diff) in diffs {
3249 if !diff.is_positive() {
3250 continue;
3251 }
3252 let mut returning_row = Row::with_capacity(returning.len());
3253 let mut packer = returning_row.packer();
3254 for expr in &returning {
3255 let datums: Vec<_> = row.iter().collect();
3256 match expr.eval(&datums, &arena) {
3257 Ok(datum) => {
3258 packer.push(datum);
3259 }
3260 Err(err) => {
3261 diff_err = Some(err.into());
3262 break;
3263 }
3264 }
3265 }
3266 let diff = NonZeroI64::try_from(diff.into_inner()).expect("known to be >= 1");
3267 let diff = match NonZeroUsize::try_from(diff) {
3268 Ok(diff) => diff,
3269 Err(err) => {
3270 diff_err = Some(err.into());
3271 break;
3272 }
3273 };
3274 returning_rows.push((returning_row, diff));
3275 if diff_err.is_some() {
3276 break;
3277 }
3278 }
3279 }
3280 let diffs = if let Some(err) = diff_err {
3281 Err(err)
3282 } else {
3283 diffs
3284 };
3285
3286 let timestamp_context = ctx.session_mut().take_transaction_timestamp_context();
3289 if let Some(timestamp_context) = timestamp_context {
3298 let (tx, rx) = tokio::sync::oneshot::channel();
3299 let conn_id = ctx.session().conn_id().clone();
3300 let pending_read_txn = PendingReadTxn {
3301 txn: PendingRead::ReadThenWrite { ctx, tx },
3302 timestamp_context,
3303 created: Instant::now(),
3304 num_requeues: 0,
3305 otel_ctx: OpenTelemetryContext::obtain(),
3306 };
3307 let result = strict_serializable_reads_tx.send((conn_id, pending_read_txn));
3308 if let Err(e) = result {
3310 warn!(
3311 "strict_serializable_reads_tx dropped before we could send: {:?}",
3312 e
3313 );
3314 return;
3315 }
3316 let result = rx.await;
3317 ctx = match result {
3319 Ok(Some(ctx)) => ctx,
3320 Ok(None) => {
3321 return;
3324 }
3325 Err(e) => {
3326 warn!(
3327 "tx used to linearize read in read then write transaction dropped before we could send: {:?}",
3328 e
3329 );
3330 return;
3331 }
3332 };
3333 }
3334
3335 match diffs {
3336 Ok(diffs) => {
3337 let result = Self::send_diffs(
3338 ctx.session_mut(),
3339 plan::SendDiffsPlan {
3340 id,
3341 updates: diffs,
3342 kind,
3343 returning: returning_rows,
3344 max_result_size,
3345 },
3346 );
3347 ctx.retire(result);
3348 }
3349 Err(e) => {
3350 ctx.retire(Err(e));
3351 }
3352 }
3353 });
3354 }
3355
3356 #[instrument]
3357 pub(super) async fn sequence_alter_item_rename(
3358 &mut self,
3359 ctx: &mut ExecuteContext,
3360 plan: plan::AlterItemRenamePlan,
3361 ) -> Result<ExecuteResponse, AdapterError> {
3362 let op = catalog::Op::RenameItem {
3363 id: plan.id,
3364 current_full_name: plan.current_full_name,
3365 to_name: plan.to_name,
3366 };
3367 match self
3368 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3369 .await
3370 {
3371 Ok(()) => Ok(ExecuteResponse::AlteredObject(plan.object_type)),
3372 Err(err) => Err(err),
3373 }
3374 }
3375
3376 #[instrument]
3377 pub(super) async fn sequence_alter_retain_history(
3378 &mut self,
3379 ctx: &mut ExecuteContext,
3380 plan: plan::AlterRetainHistoryPlan,
3381 ) -> Result<ExecuteResponse, AdapterError> {
3382 let ops = vec![catalog::Op::AlterRetainHistory {
3383 id: plan.id,
3384 value: plan.value,
3385 window: plan.window,
3386 }];
3387 self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3388 Box::pin(async move {
3389 let catalog_item = coord.catalog().get_entry(&plan.id).item();
3390 let cluster = match catalog_item {
3391 CatalogItem::Table(_)
3392 | CatalogItem::MaterializedView(_)
3393 | CatalogItem::Source(_)
3394 | CatalogItem::ContinualTask(_) => None,
3395 CatalogItem::Index(index) => Some(index.cluster_id),
3396 CatalogItem::Log(_)
3397 | CatalogItem::View(_)
3398 | CatalogItem::Sink(_)
3399 | CatalogItem::Type(_)
3400 | CatalogItem::Func(_)
3401 | CatalogItem::Secret(_)
3402 | CatalogItem::Connection(_) => unreachable!(),
3403 };
3404 match cluster {
3405 Some(cluster) => {
3406 coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3407 }
3408 None => {
3409 coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3410 }
3411 }
3412 })
3413 })
3414 .await?;
3415 Ok(ExecuteResponse::AlteredObject(plan.object_type))
3416 }
3417
3418 #[instrument]
3419 pub(super) async fn sequence_alter_schema_rename(
3420 &mut self,
3421 ctx: &mut ExecuteContext,
3422 plan: plan::AlterSchemaRenamePlan,
3423 ) -> Result<ExecuteResponse, AdapterError> {
3424 let (database_spec, schema_spec) = plan.cur_schema_spec;
3425 let op = catalog::Op::RenameSchema {
3426 database_spec,
3427 schema_spec,
3428 new_name: plan.new_schema_name,
3429 check_reserved_names: true,
3430 };
3431 match self
3432 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
3433 .await
3434 {
3435 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3436 Err(err) => Err(err),
3437 }
3438 }
3439
3440 #[instrument]
3441 pub(super) async fn sequence_alter_schema_swap(
3442 &mut self,
3443 ctx: &mut ExecuteContext,
3444 plan: plan::AlterSchemaSwapPlan,
3445 ) -> Result<ExecuteResponse, AdapterError> {
3446 let plan::AlterSchemaSwapPlan {
3447 schema_a_spec: (schema_a_db, schema_a),
3448 schema_a_name,
3449 schema_b_spec: (schema_b_db, schema_b),
3450 schema_b_name,
3451 name_temp,
3452 } = plan;
3453
3454 let op_a = catalog::Op::RenameSchema {
3455 database_spec: schema_a_db,
3456 schema_spec: schema_a,
3457 new_name: name_temp,
3458 check_reserved_names: false,
3459 };
3460 let op_b = catalog::Op::RenameSchema {
3461 database_spec: schema_b_db,
3462 schema_spec: schema_b,
3463 new_name: schema_a_name,
3464 check_reserved_names: false,
3465 };
3466 let op_c = catalog::Op::RenameSchema {
3467 database_spec: schema_a_db,
3468 schema_spec: schema_a,
3469 new_name: schema_b_name,
3470 check_reserved_names: false,
3471 };
3472
3473 match self
3474 .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_c], |_, _| {
3475 Box::pin(async {})
3476 })
3477 .await
3478 {
3479 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Schema)),
3480 Err(err) => Err(err),
3481 }
3482 }
3483
3484 #[instrument]
3485 pub(super) async fn sequence_alter_role(
3486 &mut self,
3487 session: &Session,
3488 plan::AlterRolePlan { id, name, option }: plan::AlterRolePlan,
3489 ) -> Result<ExecuteResponse, AdapterError> {
3490 let catalog = self.catalog().for_session(session);
3491 let role = catalog.get_role(&id);
3492
3493 let mut notices = vec![];
3495
3496 let mut attributes = role.attributes().clone();
3498 let mut vars = role.vars().clone();
3499
3500 match option {
3502 PlannedAlterRoleOption::Attributes(attrs) => {
3503 self.validate_role_attributes(&attrs.clone().into())?;
3504
3505 if let Some(inherit) = attrs.inherit {
3506 attributes.inherit = inherit;
3507 }
3508
3509 if let Some(password) = attrs.password {
3510 attributes.password = Some(password);
3511 }
3512
3513 if let Some(superuser) = attrs.superuser {
3514 attributes.superuser = Some(superuser);
3515 }
3516
3517 if let Some(login) = attrs.login {
3518 attributes.login = Some(login);
3519 }
3520
3521 if attrs.nopassword.unwrap_or(false) {
3522 attributes.password = None;
3523 }
3524
3525 if let Some(notice) = self.should_emit_rbac_notice(session) {
3526 notices.push(notice);
3527 }
3528 }
3529 PlannedAlterRoleOption::Variable(variable) => {
3530 let session_var = session.vars().inspect(variable.name())?;
3532 session_var.visible(session.user(), catalog.system_vars())?;
3534
3535 if variable.name() == vars::OLD_AUTO_ROUTE_CATALOG_QUERIES {
3538 notices.push(AdapterNotice::AutoRouteIntrospectionQueriesUsage);
3539 } else if let PlannedRoleVariable::Set {
3540 name,
3541 value: VariableValue::Values(vals),
3542 } = &variable
3543 {
3544 if name == vars::CLUSTER.name() && vals[0] == vars::OLD_CATALOG_SERVER_CLUSTER {
3545 notices.push(AdapterNotice::IntrospectionClusterUsage);
3546 }
3547 }
3548
3549 let var_name = match variable {
3550 PlannedRoleVariable::Set { name, value } => {
3551 match &value {
3553 VariableValue::Default => {
3554 vars.remove(&name);
3555 }
3556 VariableValue::Values(vals) => {
3557 let var = match &vals[..] {
3558 [val] => OwnedVarInput::Flat(val.clone()),
3559 vals => OwnedVarInput::SqlSet(vals.to_vec()),
3560 };
3561 session_var.check(var.borrow())?;
3563
3564 vars.insert(name.clone(), var);
3565 }
3566 };
3567 name
3568 }
3569 PlannedRoleVariable::Reset { name } => {
3570 vars.remove(&name);
3572 name
3573 }
3574 };
3575
3576 notices.push(AdapterNotice::VarDefaultUpdated {
3578 role: Some(name.clone()),
3579 var_name: Some(var_name),
3580 });
3581 }
3582 }
3583
3584 let op = catalog::Op::AlterRole {
3585 id,
3586 name,
3587 attributes,
3588 vars: RoleVars { map: vars },
3589 };
3590 let response = self
3591 .catalog_transact(Some(session), vec![op])
3592 .await
3593 .map(|_| ExecuteResponse::AlteredRole)?;
3594
3595 session.add_notices(notices);
3597
3598 Ok(response)
3599 }
3600
3601 #[instrument]
3602 pub(super) async fn sequence_alter_sink_prepare(
3603 &mut self,
3604 ctx: ExecuteContext,
3605 plan: plan::AlterSinkPlan,
3606 ) {
3607 let id_bundle = crate::CollectionIdBundle {
3609 storage_ids: BTreeSet::from_iter([plan.sink.from]),
3610 compute_ids: BTreeMap::new(),
3611 };
3612 let read_hold = self.acquire_read_holds(&id_bundle);
3613
3614 let Some(read_ts) = read_hold.least_valid_read().into_option() else {
3615 ctx.retire(Err(AdapterError::UnreadableSinkCollection));
3616 return;
3617 };
3618
3619 let otel_ctx = OpenTelemetryContext::obtain();
3620 let from_item_id = self.catalog().resolve_item_id(&plan.sink.from);
3621
3622 let plan_validity = PlanValidity::new(
3623 self.catalog().transient_revision(),
3624 BTreeSet::from_iter([from_item_id]),
3625 Some(plan.in_cluster),
3626 None,
3627 ctx.session().role_metadata().clone(),
3628 );
3629
3630 let create_sink_stmt = match mz_sql::parse::parse(&plan.sink.create_sql)
3633 .expect("invalid create sink sql")
3634 .into_element()
3635 .ast
3636 {
3637 Statement::CreateSink(stmt) => stmt,
3638 _ => unreachable!("invalid statment kind for sink"),
3639 };
3640 let catalog = self.catalog().for_system_session();
3641 let (_, resolved_ids) = match mz_sql::names::resolve(&catalog, create_sink_stmt) {
3642 Ok(ok) => ok,
3643 Err(e) => {
3644 ctx.retire(Err(AdapterError::internal("ALTER SINK", e)));
3645 return;
3646 }
3647 };
3648
3649 info!(
3650 "preparing alter sink for {}: frontiers={:?} export={:?}",
3651 plan.global_id,
3652 self.controller
3653 .storage_collections
3654 .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3655 self.controller.storage.export(plan.global_id)
3656 );
3657
3658 self.install_storage_watch_set(
3661 ctx.session().conn_id().clone(),
3662 BTreeSet::from_iter([plan.global_id]),
3663 read_ts,
3664 WatchSetResponse::AlterSinkReady(AlterSinkReadyContext {
3665 ctx: Some(ctx),
3666 otel_ctx,
3667 plan,
3668 plan_validity,
3669 resolved_ids,
3670 read_hold,
3671 }),
3672 );
3673 }
3674
3675 #[instrument]
3676 pub async fn sequence_alter_sink_finish(&mut self, mut ctx: AlterSinkReadyContext) {
3677 ctx.otel_ctx.attach_as_parent();
3678 match ctx.plan_validity.check(self.catalog()) {
3679 Ok(()) => {}
3680 Err(err) => {
3681 ctx.retire(Err(err));
3682 return;
3683 }
3684 }
3685 {
3686 let plan = &ctx.plan;
3687 info!(
3688 "finishing alter sink for {}: frontiers={:?} export={:?}",
3689 plan.global_id,
3690 self.controller
3691 .storage_collections
3692 .collections_frontiers(vec![plan.global_id, plan.sink.from]),
3693 self.controller.storage.export(plan.global_id)
3694 );
3695 }
3696
3697 let plan::AlterSinkPlan {
3698 item_id,
3699 global_id,
3700 sink,
3701 with_snapshot,
3702 in_cluster,
3703 } = ctx.plan.clone();
3704 let write_frontier = &self
3707 .controller
3708 .storage
3709 .export(global_id)
3710 .expect("sink known to exist")
3711 .write_frontier;
3712 let as_of = ctx.read_hold.least_valid_read();
3713 assert!(
3714 write_frontier.iter().all(|t| as_of.less_than(t)),
3715 "{:?} should be strictly less than {:?}",
3716 &*as_of,
3717 &**write_frontier
3718 );
3719
3720 let catalog_sink = Sink {
3721 create_sql: sink.create_sql,
3722 global_id,
3723 from: sink.from,
3724 connection: sink.connection.clone(),
3725 envelope: sink.envelope,
3726 version: sink.version,
3727 with_snapshot,
3728 resolved_ids: ctx.resolved_ids.clone(),
3729 cluster_id: in_cluster,
3730 };
3731
3732 let ops = vec![catalog::Op::UpdateItem {
3733 id: item_id,
3734 name: self.catalog.get_entry(&item_id).name().clone(),
3735 to_item: CatalogItem::Sink(catalog_sink),
3736 }];
3737
3738 match self
3739 .catalog_transact(Some(ctx.ctx().session_mut()), ops)
3740 .await
3741 {
3742 Ok(()) => {}
3743 Err(err) => {
3744 ctx.retire(Err(err));
3745 return;
3746 }
3747 }
3748
3749 let from_entry = self.catalog().get_entry_by_global_id(&sink.from);
3750 let storage_sink_desc = StorageSinkDesc {
3751 from: sink.from,
3752 from_desc: from_entry
3753 .desc_opt()
3754 .expect("sinks can only be built on items with descs")
3755 .into_owned(),
3756 connection: sink
3757 .connection
3758 .clone()
3759 .into_inline_connection(self.catalog().state()),
3760 envelope: sink.envelope,
3761 as_of,
3762 with_snapshot,
3763 version: sink.version,
3764 from_storage_metadata: (),
3765 to_storage_metadata: (),
3766 };
3767
3768 self.controller
3769 .storage
3770 .alter_export(
3771 global_id,
3772 ExportDescription {
3773 sink: storage_sink_desc,
3774 instance_id: in_cluster,
3775 },
3776 )
3777 .await
3778 .unwrap_or_terminate("cannot fail to alter source desc");
3779
3780 ctx.retire(Ok(ExecuteResponse::AlteredObject(ObjectType::Sink)));
3781 }
3782
3783 #[instrument]
3784 pub(super) async fn sequence_alter_connection(
3785 &mut self,
3786 ctx: ExecuteContext,
3787 AlterConnectionPlan { id, action }: AlterConnectionPlan,
3788 ) {
3789 match action {
3790 AlterConnectionAction::RotateKeys => {
3791 self.sequence_rotate_keys(ctx, id).await;
3792 }
3793 AlterConnectionAction::AlterOptions {
3794 set_options,
3795 drop_options,
3796 validate,
3797 } => {
3798 self.sequence_alter_connection_options(ctx, id, set_options, drop_options, validate)
3799 .await
3800 }
3801 }
3802 }
3803
3804 #[instrument]
3805 async fn sequence_alter_connection_options(
3806 &mut self,
3807 mut ctx: ExecuteContext,
3808 id: CatalogItemId,
3809 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<mz_sql::names::Aug>>>,
3810 drop_options: BTreeSet<ConnectionOptionName>,
3811 validate: bool,
3812 ) {
3813 let cur_entry = self.catalog().get_entry(&id);
3814 let cur_conn = cur_entry.connection().expect("known to be connection");
3815 let connection_gid = cur_conn.global_id();
3816
3817 let inner = || -> Result<Connection, AdapterError> {
3818 let create_conn_stmt = match mz_sql::parse::parse(&cur_conn.create_sql)
3820 .expect("invalid create sql persisted to catalog")
3821 .into_element()
3822 .ast
3823 {
3824 Statement::CreateConnection(stmt) => stmt,
3825 _ => unreachable!("proved type is source"),
3826 };
3827
3828 let catalog = self.catalog().for_system_session();
3829
3830 let (mut create_conn_stmt, resolved_ids) =
3832 mz_sql::names::resolve(&catalog, create_conn_stmt)
3833 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3834
3835 create_conn_stmt
3837 .values
3838 .retain(|o| !set_options.contains_key(&o.name) && !drop_options.contains(&o.name));
3839
3840 create_conn_stmt.values.extend(
3842 set_options
3843 .into_iter()
3844 .map(|(name, value)| ConnectionOption { name, value }),
3845 );
3846
3847 let mut catalog = self.catalog().for_system_session();
3850 catalog.mark_id_unresolvable_for_replanning(id);
3851
3852 let plan = match mz_sql::plan::plan(
3854 None,
3855 &catalog,
3856 Statement::CreateConnection(create_conn_stmt),
3857 &Params::empty(),
3858 &resolved_ids,
3859 )
3860 .map_err(|e| AdapterError::InvalidAlter("CONNECTION", e))?
3861 {
3862 Plan::CreateConnection(plan) => plan,
3863 _ => unreachable!("create source plan is only valid response"),
3864 };
3865
3866 let create_conn_stmt = match mz_sql::parse::parse(&plan.connection.create_sql)
3868 .expect("invalid create sql persisted to catalog")
3869 .into_element()
3870 .ast
3871 {
3872 Statement::CreateConnection(stmt) => stmt,
3873 _ => unreachable!("proved type is source"),
3874 };
3875
3876 let catalog = self.catalog().for_system_session();
3877
3878 let (_, new_deps) = mz_sql::names::resolve(&catalog, create_conn_stmt)
3880 .map_err(|e| AdapterError::internal("ALTER CONNECTION", e))?;
3881
3882 Ok(Connection {
3883 create_sql: plan.connection.create_sql,
3884 global_id: cur_conn.global_id,
3885 details: plan.connection.details,
3886 resolved_ids: new_deps,
3887 })
3888 };
3889
3890 let conn = match inner() {
3891 Ok(conn) => conn,
3892 Err(e) => {
3893 return ctx.retire(Err(e));
3894 }
3895 };
3896
3897 if validate {
3898 let connection = conn
3899 .details
3900 .to_connection()
3901 .into_inline_connection(self.catalog().state());
3902
3903 let internal_cmd_tx = self.internal_cmd_tx.clone();
3904 let transient_revision = self.catalog().transient_revision();
3905 let conn_id = ctx.session().conn_id().clone();
3906 let otel_ctx = OpenTelemetryContext::obtain();
3907 let role_metadata = ctx.session().role_metadata().clone();
3908 let current_storage_parameters = self.controller.storage.config().clone();
3909
3910 task::spawn(
3911 || format!("validate_alter_connection:{conn_id}"),
3912 async move {
3913 let resolved_ids = conn.resolved_ids.clone();
3914 let dependency_ids: BTreeSet<_> = resolved_ids.items().copied().collect();
3915 let result = match connection.validate(id, ¤t_storage_parameters).await {
3916 Ok(()) => Ok(conn),
3917 Err(err) => Err(err.into()),
3918 };
3919
3920 let result = internal_cmd_tx.send(Message::AlterConnectionValidationReady(
3922 AlterConnectionValidationReady {
3923 ctx,
3924 result,
3925 connection_id: id,
3926 connection_gid,
3927 plan_validity: PlanValidity::new(
3928 transient_revision,
3929 dependency_ids.clone(),
3930 None,
3931 None,
3932 role_metadata,
3933 ),
3934 otel_ctx,
3935 resolved_ids,
3936 },
3937 ));
3938 if let Err(e) = result {
3939 tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
3940 }
3941 },
3942 );
3943 } else {
3944 let result = self
3945 .sequence_alter_connection_stage_finish(ctx.session_mut(), id, conn)
3946 .await;
3947 ctx.retire(result);
3948 }
3949 }
3950
3951 #[instrument]
3952 pub(crate) async fn sequence_alter_connection_stage_finish(
3953 &mut self,
3954 session: &mut Session,
3955 id: CatalogItemId,
3956 connection: Connection,
3957 ) -> Result<ExecuteResponse, AdapterError> {
3958 match self.catalog.get_entry(&id).item() {
3959 CatalogItem::Connection(curr_conn) => {
3960 curr_conn
3961 .details
3962 .to_connection()
3963 .alter_compatible(curr_conn.global_id, &connection.details.to_connection())
3964 .map_err(StorageError::from)?;
3965 }
3966 _ => unreachable!("known to be a connection"),
3967 };
3968
3969 let ops = vec![catalog::Op::UpdateItem {
3970 id,
3971 name: self.catalog.get_entry(&id).name().clone(),
3972 to_item: CatalogItem::Connection(connection.clone()),
3973 }];
3974
3975 self.catalog_transact(Some(session), ops).await?;
3976
3977 match connection.details {
3978 ConnectionDetails::AwsPrivatelink(ref privatelink) => {
3979 let spec = VpcEndpointConfig {
3980 aws_service_name: privatelink.service_name.to_owned(),
3981 availability_zone_ids: privatelink.availability_zones.to_owned(),
3982 };
3983 self.cloud_resource_controller
3984 .as_ref()
3985 .ok_or(AdapterError::Unsupported("AWS PrivateLink connections"))?
3986 .ensure_vpc_endpoint(id, spec)
3987 .await?;
3988 }
3989 _ => {}
3990 };
3991
3992 let entry = self.catalog().get_entry(&id);
3993
3994 let mut connections = VecDeque::new();
3995 connections.push_front(entry.id());
3996
3997 let mut source_connections = BTreeMap::new();
3998 let mut sink_connections = BTreeMap::new();
3999 let mut source_export_data_configs = BTreeMap::new();
4000
4001 while let Some(id) = connections.pop_front() {
4002 for id in self.catalog.get_entry(&id).used_by() {
4003 let entry = self.catalog.get_entry(id);
4004 match entry.item() {
4005 CatalogItem::Connection(_) => connections.push_back(*id),
4006 CatalogItem::Source(source) => {
4007 let desc = match &entry.source().expect("known to be source").data_source {
4008 DataSourceDesc::Ingestion { ingestion_desc, .. } => ingestion_desc
4009 .desc
4010 .clone()
4011 .into_inline_connection(self.catalog().state()),
4012 _ => unreachable!("only ingestions reference connections"),
4013 };
4014
4015 source_connections.insert(source.global_id, desc.connection);
4016 }
4017 CatalogItem::Sink(sink) => {
4018 let export = entry.sink().expect("known to be sink");
4019 sink_connections.insert(
4020 sink.global_id,
4021 export
4022 .connection
4023 .clone()
4024 .into_inline_connection(self.catalog().state()),
4025 );
4026 }
4027 CatalogItem::Table(table) => {
4028 if let Some((_, _, _, export_data_config)) = entry.source_export_details() {
4031 let data_config = export_data_config.clone();
4032 source_export_data_configs.insert(
4033 table.global_id_writes(),
4034 data_config.into_inline_connection(self.catalog().state()),
4035 );
4036 }
4037 }
4038 t => unreachable!("connection dependency not expected on {:?}", t),
4039 }
4040 }
4041 }
4042
4043 if !source_connections.is_empty() {
4044 self.controller
4045 .storage
4046 .alter_ingestion_connections(source_connections)
4047 .await
4048 .unwrap_or_terminate("cannot fail to alter ingestion connection");
4049 }
4050
4051 if !sink_connections.is_empty() {
4052 self.controller
4053 .storage
4054 .alter_export_connections(sink_connections)
4055 .await
4056 .unwrap_or_terminate("altering exports after txn must succeed");
4057 }
4058
4059 if !source_export_data_configs.is_empty() {
4060 self.controller
4061 .storage
4062 .alter_ingestion_export_data_configs(source_export_data_configs)
4063 .await
4064 .unwrap_or_terminate("altering source export data configs after txn must succeed");
4065 }
4066
4067 Ok(ExecuteResponse::AlteredObject(ObjectType::Connection))
4068 }
4069
4070 #[instrument]
4071 pub(super) async fn sequence_alter_source(
4072 &mut self,
4073 session: &Session,
4074 plan::AlterSourcePlan {
4075 item_id,
4076 ingestion_id,
4077 action,
4078 }: plan::AlterSourcePlan,
4079 ) -> Result<ExecuteResponse, AdapterError> {
4080 let cur_entry = self.catalog().get_entry(&item_id);
4081 let cur_source = cur_entry.source().expect("known to be source");
4082
4083 let create_sql_to_stmt_deps = |coord: &Coordinator, err_cx, create_source_sql| {
4084 let create_source_stmt = match mz_sql::parse::parse(create_source_sql)
4086 .expect("invalid create sql persisted to catalog")
4087 .into_element()
4088 .ast
4089 {
4090 Statement::CreateSource(stmt) => stmt,
4091 _ => unreachable!("proved type is source"),
4092 };
4093
4094 let catalog = coord.catalog().for_system_session();
4095
4096 mz_sql::names::resolve(&catalog, create_source_stmt)
4098 .map_err(|e| AdapterError::internal(err_cx, e))
4099 };
4100
4101 match action {
4102 plan::AlterSourceAction::AddSubsourceExports {
4103 subsources,
4104 options,
4105 } => {
4106 const ALTER_SOURCE: &str = "ALTER SOURCE...ADD SUBSOURCES";
4107
4108 let mz_sql::plan::AlterSourceAddSubsourceOptionExtracted {
4109 text_columns: mut new_text_columns,
4110 exclude_columns: mut new_exclude_columns,
4111 ..
4112 } = options.try_into()?;
4113
4114 let (mut create_source_stmt, resolved_ids) =
4116 create_sql_to_stmt_deps(self, ALTER_SOURCE, cur_entry.create_sql())?;
4117
4118 let catalog = self.catalog();
4120 let curr_references: BTreeSet<_> = catalog
4121 .get_entry(&item_id)
4122 .used_by()
4123 .into_iter()
4124 .filter_map(|subsource| {
4125 catalog
4126 .get_entry(subsource)
4127 .subsource_details()
4128 .map(|(_id, reference, _details)| reference)
4129 })
4130 .collect();
4131
4132 let purification_err =
4135 || AdapterError::internal(ALTER_SOURCE, "error in subsource purification");
4136
4137 match &mut create_source_stmt.connection {
4141 CreateSourceConnection::Postgres {
4142 options: curr_options,
4143 ..
4144 } => {
4145 let mz_sql::plan::PgConfigOptionExtracted {
4146 mut text_columns, ..
4147 } = curr_options.clone().try_into()?;
4148
4149 curr_options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
4152
4153 text_columns.retain(|column_qualified_reference| {
4155 mz_ore::soft_assert_eq_or_log!(
4156 column_qualified_reference.0.len(),
4157 4,
4158 "all TEXT COLUMNS values must be column-qualified references"
4159 );
4160 let mut table = column_qualified_reference.clone();
4161 table.0.truncate(3);
4162 curr_references.contains(&table)
4163 });
4164
4165 new_text_columns.extend(text_columns);
4167
4168 if !new_text_columns.is_empty() {
4170 new_text_columns.sort();
4171 let new_text_columns = new_text_columns
4172 .into_iter()
4173 .map(WithOptionValue::UnresolvedItemName)
4174 .collect();
4175
4176 curr_options.push(PgConfigOption {
4177 name: PgConfigOptionName::TextColumns,
4178 value: Some(WithOptionValue::Sequence(new_text_columns)),
4179 });
4180 }
4181 }
4182 CreateSourceConnection::MySql {
4183 options: curr_options,
4184 ..
4185 } => {
4186 let mz_sql::plan::MySqlConfigOptionExtracted {
4187 mut text_columns,
4188 mut exclude_columns,
4189 ..
4190 } = curr_options.clone().try_into()?;
4191
4192 curr_options.retain(|o| {
4195 !matches!(
4196 o.name,
4197 MySqlConfigOptionName::TextColumns
4198 | MySqlConfigOptionName::ExcludeColumns
4199 )
4200 });
4201
4202 let column_referenced =
4204 |column_qualified_reference: &UnresolvedItemName| {
4205 mz_ore::soft_assert_eq_or_log!(
4206 column_qualified_reference.0.len(),
4207 3,
4208 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4209 );
4210 let mut table = column_qualified_reference.clone();
4211 table.0.truncate(2);
4212 curr_references.contains(&table)
4213 };
4214 text_columns.retain(column_referenced);
4215 exclude_columns.retain(column_referenced);
4216
4217 new_text_columns.extend(text_columns);
4219 new_exclude_columns.extend(exclude_columns);
4220
4221 if !new_text_columns.is_empty() {
4223 new_text_columns.sort();
4224 let new_text_columns = new_text_columns
4225 .into_iter()
4226 .map(WithOptionValue::UnresolvedItemName)
4227 .collect();
4228
4229 curr_options.push(MySqlConfigOption {
4230 name: MySqlConfigOptionName::TextColumns,
4231 value: Some(WithOptionValue::Sequence(new_text_columns)),
4232 });
4233 }
4234 if !new_exclude_columns.is_empty() {
4236 new_exclude_columns.sort();
4237 let new_exclude_columns = new_exclude_columns
4238 .into_iter()
4239 .map(WithOptionValue::UnresolvedItemName)
4240 .collect();
4241
4242 curr_options.push(MySqlConfigOption {
4243 name: MySqlConfigOptionName::ExcludeColumns,
4244 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4245 });
4246 }
4247 }
4248 CreateSourceConnection::SqlServer {
4249 options: curr_options,
4250 ..
4251 } => {
4252 let mz_sql::plan::SqlServerConfigOptionExtracted {
4253 mut text_columns,
4254 mut exclude_columns,
4255 ..
4256 } = curr_options.clone().try_into()?;
4257
4258 curr_options.retain(|o| {
4261 !matches!(
4262 o.name,
4263 SqlServerConfigOptionName::TextColumns
4264 | SqlServerConfigOptionName::ExcludeColumns
4265 )
4266 });
4267
4268 let column_referenced =
4270 |column_qualified_reference: &UnresolvedItemName| {
4271 mz_ore::soft_assert_eq_or_log!(
4272 column_qualified_reference.0.len(),
4273 3,
4274 "all TEXT COLUMNS & EXCLUDE COLUMNS values must be column-qualified references"
4275 );
4276 let mut table = column_qualified_reference.clone();
4277 table.0.truncate(2);
4278 curr_references.contains(&table)
4279 };
4280 text_columns.retain(column_referenced);
4281 exclude_columns.retain(column_referenced);
4282
4283 new_text_columns.extend(text_columns);
4285 new_exclude_columns.extend(exclude_columns);
4286
4287 if !new_text_columns.is_empty() {
4289 new_text_columns.sort();
4290 let new_text_columns = new_text_columns
4291 .into_iter()
4292 .map(WithOptionValue::UnresolvedItemName)
4293 .collect();
4294
4295 curr_options.push(SqlServerConfigOption {
4296 name: SqlServerConfigOptionName::TextColumns,
4297 value: Some(WithOptionValue::Sequence(new_text_columns)),
4298 });
4299 }
4300 if !new_exclude_columns.is_empty() {
4302 new_exclude_columns.sort();
4303 let new_exclude_columns = new_exclude_columns
4304 .into_iter()
4305 .map(WithOptionValue::UnresolvedItemName)
4306 .collect();
4307
4308 curr_options.push(SqlServerConfigOption {
4309 name: SqlServerConfigOptionName::ExcludeColumns,
4310 value: Some(WithOptionValue::Sequence(new_exclude_columns)),
4311 });
4312 }
4313 }
4314 _ => return Err(purification_err()),
4315 };
4316
4317 let mut catalog = self.catalog().for_system_session();
4318 catalog.mark_id_unresolvable_for_replanning(cur_entry.id());
4319
4320 let plan = match mz_sql::plan::plan(
4322 None,
4323 &catalog,
4324 Statement::CreateSource(create_source_stmt),
4325 &Params::empty(),
4326 &resolved_ids,
4327 )
4328 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?
4329 {
4330 Plan::CreateSource(plan) => plan,
4331 _ => unreachable!("create source plan is only valid response"),
4332 };
4333
4334 let source = Source::new(
4338 plan,
4339 cur_source.global_id,
4340 resolved_ids,
4341 cur_source.custom_logical_compaction_window,
4342 cur_source.is_retained_metrics_object,
4343 );
4344
4345 let source_compaction_window = source.custom_logical_compaction_window;
4346
4347 let desc = match &source.data_source {
4349 DataSourceDesc::Ingestion { ingestion_desc, .. } => ingestion_desc
4350 .desc
4351 .clone()
4352 .into_inline_connection(self.catalog().state()),
4353 _ => unreachable!("already verified of type ingestion"),
4354 };
4355
4356 self.controller
4357 .storage
4358 .check_alter_ingestion_source_desc(ingestion_id, &desc)
4359 .map_err(|e| AdapterError::internal(ALTER_SOURCE, e))?;
4360
4361 let mut ops = vec![catalog::Op::UpdateItem {
4364 id: item_id,
4365 name: self.catalog.get_entry(&item_id).name().clone(),
4368 to_item: CatalogItem::Source(source),
4369 }];
4370
4371 let CreateSourceInner {
4372 ops: new_ops,
4373 sources,
4374 if_not_exists_ids,
4375 } = self.create_source_inner(session, subsources).await?;
4376
4377 ops.extend(new_ops.into_iter());
4378
4379 assert!(
4380 if_not_exists_ids.is_empty(),
4381 "IF NOT EXISTS not supported for ALTER SOURCE...ADD SUBSOURCES"
4382 );
4383
4384 self.catalog_transact(Some(session), ops).await?;
4385
4386 let mut item_ids = BTreeSet::new();
4387 let mut collections = Vec::with_capacity(sources.len());
4388 for (item_id, source) in sources {
4389 let status_id = self.catalog().resolve_builtin_storage_collection(
4390 &mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY,
4391 );
4392 let source_status_collection_id =
4393 Some(self.catalog().get_entry(&status_id).latest_global_id());
4394
4395 let (data_source, status_collection_id) = match source.data_source {
4396 DataSourceDesc::IngestionExport {
4398 ingestion_id,
4399 external_reference: _,
4400 details,
4401 data_config,
4402 } => {
4403 let ingestion_id =
4406 self.catalog().get_entry(&ingestion_id).latest_global_id();
4407 (
4408 DataSource::IngestionExport {
4409 ingestion_id,
4410 details,
4411 data_config: data_config
4412 .into_inline_connection(self.catalog().state()),
4413 },
4414 source_status_collection_id,
4415 )
4416 }
4417 o => {
4418 unreachable!(
4419 "ALTER SOURCE...ADD SUBSOURCE only creates SourceExport but got {:?}",
4420 o
4421 )
4422 }
4423 };
4424
4425 collections.push((
4426 source.global_id,
4427 CollectionDescription {
4428 desc: source.desc.clone(),
4429 data_source,
4430 since: None,
4431 status_collection_id,
4432 timeline: Some(source.timeline.clone()),
4433 },
4434 ));
4435
4436 item_ids.insert(item_id);
4437 }
4438
4439 let storage_metadata = self.catalog.state().storage_metadata();
4440
4441 self.controller
4442 .storage
4443 .create_collections(storage_metadata, None, collections)
4444 .await
4445 .unwrap_or_terminate("cannot fail to create collections");
4446
4447 self.initialize_storage_read_policies(
4448 item_ids,
4449 source_compaction_window.unwrap_or(CompactionWindow::Default),
4450 )
4451 .await;
4452 }
4453 plan::AlterSourceAction::RefreshReferences { references } => {
4454 self.catalog_transact(
4455 Some(session),
4456 vec![catalog::Op::UpdateSourceReferences {
4457 source_id: item_id,
4458 references: references.into(),
4459 }],
4460 )
4461 .await?;
4462 }
4463 }
4464
4465 Ok(ExecuteResponse::AlteredObject(ObjectType::Source))
4466 }
4467
4468 #[instrument]
4469 pub(super) async fn sequence_alter_system_set(
4470 &mut self,
4471 session: &Session,
4472 plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
4473 ) -> Result<ExecuteResponse, AdapterError> {
4474 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4475 if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
4477 self.validate_alter_system_network_policy(session, &value)?;
4478 }
4479
4480 let op = match value {
4481 plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
4482 name: name.clone(),
4483 value: OwnedVarInput::SqlSet(values),
4484 },
4485 plan::VariableValue::Default => {
4486 catalog::Op::ResetSystemConfiguration { name: name.clone() }
4487 }
4488 };
4489 self.catalog_transact(Some(session), vec![op]).await?;
4490
4491 session.add_notice(AdapterNotice::VarDefaultUpdated {
4492 role: None,
4493 var_name: Some(name),
4494 });
4495 Ok(ExecuteResponse::AlteredSystemConfiguration)
4496 }
4497
4498 #[instrument]
4499 pub(super) async fn sequence_alter_system_reset(
4500 &mut self,
4501 session: &Session,
4502 plan::AlterSystemResetPlan { name }: plan::AlterSystemResetPlan,
4503 ) -> Result<ExecuteResponse, AdapterError> {
4504 self.is_user_allowed_to_alter_system(session, Some(&name))?;
4505 let op = catalog::Op::ResetSystemConfiguration { name: name.clone() };
4506 self.catalog_transact(Some(session), vec![op]).await?;
4507 session.add_notice(AdapterNotice::VarDefaultUpdated {
4508 role: None,
4509 var_name: Some(name),
4510 });
4511 Ok(ExecuteResponse::AlteredSystemConfiguration)
4512 }
4513
4514 #[instrument]
4515 pub(super) async fn sequence_alter_system_reset_all(
4516 &mut self,
4517 session: &Session,
4518 _: plan::AlterSystemResetAllPlan,
4519 ) -> Result<ExecuteResponse, AdapterError> {
4520 self.is_user_allowed_to_alter_system(session, None)?;
4521 let op = catalog::Op::ResetAllSystemConfiguration;
4522 self.catalog_transact(Some(session), vec![op]).await?;
4523 session.add_notice(AdapterNotice::VarDefaultUpdated {
4524 role: None,
4525 var_name: None,
4526 });
4527 Ok(ExecuteResponse::AlteredSystemConfiguration)
4528 }
4529
4530 fn is_user_allowed_to_alter_system(
4532 &self,
4533 session: &Session,
4534 var_name: Option<&str>,
4535 ) -> Result<(), AdapterError> {
4536 match (session.user().kind(), var_name) {
4537 (UserKind::Superuser, None) if session.user().is_internal() => Ok(()),
4539 (UserKind::Superuser, Some(name))
4541 if session.user().is_internal()
4542 || self.catalog().system_config().user_modifiable(name) =>
4543 {
4544 let var = self.catalog().system_config().get(name)?;
4547 var.visible(session.user(), self.catalog().system_config())?;
4548 Ok(())
4549 }
4550 (UserKind::Regular, Some(name))
4553 if self.catalog().system_config().user_modifiable(name) =>
4554 {
4555 Err(AdapterError::Unauthorized(
4556 rbac::UnauthorizedError::Superuser {
4557 action: format!("toggle the '{name}' system configuration parameter"),
4558 },
4559 ))
4560 }
4561 _ => Err(AdapterError::Unauthorized(
4562 rbac::UnauthorizedError::MzSystem {
4563 action: "alter system".into(),
4564 },
4565 )),
4566 }
4567 }
4568
4569 fn validate_alter_system_network_policy(
4570 &self,
4571 session: &Session,
4572 policy_value: &plan::VariableValue,
4573 ) -> Result<(), AdapterError> {
4574 let policy_name = match &policy_value {
4575 plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
4577 plan::VariableValue::Values(values) if values.len() == 1 => {
4578 values.iter().next().cloned()
4579 }
4580 plan::VariableValue::Values(values) => {
4581 tracing::warn!(?values, "can't set multiple network policies at once");
4582 None
4583 }
4584 };
4585 let maybe_network_policy = policy_name
4586 .as_ref()
4587 .and_then(|name| self.catalog.get_network_policy_by_name(name));
4588 let Some(network_policy) = maybe_network_policy else {
4589 return Err(AdapterError::PlanError(plan::PlanError::VarError(
4590 VarError::InvalidParameterValue {
4591 name: NETWORK_POLICY.name(),
4592 invalid_values: vec![policy_name.unwrap_or_else(|| "<none>".to_string())],
4593 reason: "no network policy with such name exists".to_string(),
4594 },
4595 )));
4596 };
4597 self.validate_alter_network_policy(session, &network_policy.rules)
4598 }
4599
4600 fn validate_alter_network_policy(
4605 &self,
4606 session: &Session,
4607 policy_rules: &Vec<NetworkPolicyRule>,
4608 ) -> Result<(), AdapterError> {
4609 if session.user().is_internal() {
4612 return Ok(());
4613 }
4614 if let Some(ip) = session.meta().client_ip() {
4615 validate_ip_with_policy_rules(ip, policy_rules)
4616 .map_err(|_| AdapterError::PlanError(plan::PlanError::NetworkPolicyLockoutError))?;
4617 } else {
4618 return Err(AdapterError::NetworkPolicyDenied(
4621 NetworkPolicyError::MissingIp,
4622 ));
4623 }
4624 Ok(())
4625 }
4626
4627 #[instrument]
4629 pub(super) fn sequence_execute(
4630 &mut self,
4631 session: &mut Session,
4632 plan: plan::ExecutePlan,
4633 ) -> Result<String, AdapterError> {
4634 Self::verify_prepared_statement(self.catalog(), session, &plan.name)?;
4636 let ps = session
4637 .get_prepared_statement_unverified(&plan.name)
4638 .expect("known to exist");
4639 let stmt = ps.stmt().cloned();
4640 let desc = ps.desc().clone();
4641 let state_revision = ps.state_revision;
4642 let logging = Arc::clone(ps.logging());
4643 session.create_new_portal(stmt, logging, desc, plan.params, Vec::new(), state_revision)
4644 }
4645
4646 #[instrument]
4647 pub(super) async fn sequence_grant_privileges(
4648 &mut self,
4649 session: &Session,
4650 plan::GrantPrivilegesPlan {
4651 update_privileges,
4652 grantees,
4653 }: plan::GrantPrivilegesPlan,
4654 ) -> Result<ExecuteResponse, AdapterError> {
4655 self.sequence_update_privileges(
4656 session,
4657 update_privileges,
4658 grantees,
4659 UpdatePrivilegeVariant::Grant,
4660 )
4661 .await
4662 }
4663
4664 #[instrument]
4665 pub(super) async fn sequence_revoke_privileges(
4666 &mut self,
4667 session: &Session,
4668 plan::RevokePrivilegesPlan {
4669 update_privileges,
4670 revokees,
4671 }: plan::RevokePrivilegesPlan,
4672 ) -> Result<ExecuteResponse, AdapterError> {
4673 self.sequence_update_privileges(
4674 session,
4675 update_privileges,
4676 revokees,
4677 UpdatePrivilegeVariant::Revoke,
4678 )
4679 .await
4680 }
4681
4682 #[instrument]
4683 async fn sequence_update_privileges(
4684 &mut self,
4685 session: &Session,
4686 update_privileges: Vec<UpdatePrivilege>,
4687 grantees: Vec<RoleId>,
4688 variant: UpdatePrivilegeVariant,
4689 ) -> Result<ExecuteResponse, AdapterError> {
4690 let mut ops = Vec::with_capacity(update_privileges.len() * grantees.len());
4691 let mut warnings = Vec::new();
4692 let catalog = self.catalog().for_session(session);
4693
4694 for UpdatePrivilege {
4695 acl_mode,
4696 target_id,
4697 grantor,
4698 } in update_privileges
4699 {
4700 let actual_object_type = catalog.get_system_object_type(&target_id);
4701 if actual_object_type.is_relation() {
4704 let applicable_privileges = rbac::all_object_privileges(actual_object_type);
4705 let non_applicable_privileges = acl_mode.difference(applicable_privileges);
4706 if !non_applicable_privileges.is_empty() {
4707 let object_description =
4708 ErrorMessageObjectDescription::from_sys_id(&target_id, &catalog);
4709 warnings.push(AdapterNotice::NonApplicablePrivilegeTypes {
4710 non_applicable_privileges,
4711 object_description,
4712 })
4713 }
4714 }
4715
4716 if let SystemObjectId::Object(object_id) = &target_id {
4717 self.catalog()
4718 .ensure_not_reserved_object(object_id, session.conn_id())?;
4719 }
4720
4721 let privileges = self
4722 .catalog()
4723 .get_privileges(&target_id, session.conn_id())
4724 .ok_or(AdapterError::Unsupported(
4727 "GRANTs/REVOKEs on an object type with no privileges",
4728 ))?;
4729
4730 for grantee in &grantees {
4731 self.catalog().ensure_not_system_role(grantee)?;
4732 self.catalog().ensure_not_predefined_role(grantee)?;
4733 let existing_privilege = privileges
4734 .get_acl_item(grantee, &grantor)
4735 .map(Cow::Borrowed)
4736 .unwrap_or_else(|| Cow::Owned(MzAclItem::empty(*grantee, grantor)));
4737
4738 match variant {
4739 UpdatePrivilegeVariant::Grant
4740 if !existing_privilege.acl_mode.contains(acl_mode) =>
4741 {
4742 ops.push(catalog::Op::UpdatePrivilege {
4743 target_id: target_id.clone(),
4744 privilege: MzAclItem {
4745 grantee: *grantee,
4746 grantor,
4747 acl_mode,
4748 },
4749 variant,
4750 });
4751 }
4752 UpdatePrivilegeVariant::Revoke
4753 if !existing_privilege
4754 .acl_mode
4755 .intersection(acl_mode)
4756 .is_empty() =>
4757 {
4758 ops.push(catalog::Op::UpdatePrivilege {
4759 target_id: target_id.clone(),
4760 privilege: MzAclItem {
4761 grantee: *grantee,
4762 grantor,
4763 acl_mode,
4764 },
4765 variant,
4766 });
4767 }
4768 _ => {}
4770 }
4771 }
4772 }
4773
4774 if ops.is_empty() {
4775 session.add_notices(warnings);
4776 return Ok(variant.into());
4777 }
4778
4779 let res = self
4780 .catalog_transact(Some(session), ops)
4781 .await
4782 .map(|_| match variant {
4783 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
4784 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
4785 });
4786 if res.is_ok() {
4787 session.add_notices(warnings);
4788 }
4789 res
4790 }
4791
4792 #[instrument]
4793 pub(super) async fn sequence_alter_default_privileges(
4794 &mut self,
4795 session: &Session,
4796 plan::AlterDefaultPrivilegesPlan {
4797 privilege_objects,
4798 privilege_acl_items,
4799 is_grant,
4800 }: plan::AlterDefaultPrivilegesPlan,
4801 ) -> Result<ExecuteResponse, AdapterError> {
4802 let mut ops = Vec::with_capacity(privilege_objects.len() * privilege_acl_items.len());
4803 let variant = if is_grant {
4804 UpdatePrivilegeVariant::Grant
4805 } else {
4806 UpdatePrivilegeVariant::Revoke
4807 };
4808 for privilege_object in &privilege_objects {
4809 self.catalog()
4810 .ensure_not_system_role(&privilege_object.role_id)?;
4811 self.catalog()
4812 .ensure_not_predefined_role(&privilege_object.role_id)?;
4813 if let Some(database_id) = privilege_object.database_id {
4814 self.catalog()
4815 .ensure_not_reserved_object(&database_id.into(), session.conn_id())?;
4816 }
4817 if let Some(schema_id) = privilege_object.schema_id {
4818 let database_spec: ResolvedDatabaseSpecifier = privilege_object.database_id.into();
4819 let schema_spec: SchemaSpecifier = schema_id.into();
4820
4821 self.catalog().ensure_not_reserved_object(
4822 &(database_spec, schema_spec).into(),
4823 session.conn_id(),
4824 )?;
4825 }
4826 for privilege_acl_item in &privilege_acl_items {
4827 self.catalog()
4828 .ensure_not_system_role(&privilege_acl_item.grantee)?;
4829 self.catalog()
4830 .ensure_not_predefined_role(&privilege_acl_item.grantee)?;
4831 ops.push(catalog::Op::UpdateDefaultPrivilege {
4832 privilege_object: privilege_object.clone(),
4833 privilege_acl_item: privilege_acl_item.clone(),
4834 variant,
4835 })
4836 }
4837 }
4838
4839 self.catalog_transact(Some(session), ops).await?;
4840 Ok(ExecuteResponse::AlteredDefaultPrivileges)
4841 }
4842
4843 #[instrument]
4844 pub(super) async fn sequence_grant_role(
4845 &mut self,
4846 session: &Session,
4847 plan::GrantRolePlan {
4848 role_ids,
4849 member_ids,
4850 grantor_id,
4851 }: plan::GrantRolePlan,
4852 ) -> Result<ExecuteResponse, AdapterError> {
4853 let catalog = self.catalog();
4854 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4855 for role_id in role_ids {
4856 for member_id in &member_ids {
4857 let member_membership: BTreeSet<_> =
4858 catalog.get_role(member_id).membership().keys().collect();
4859 if member_membership.contains(&role_id) {
4860 let role_name = catalog.get_role(&role_id).name().to_string();
4861 let member_name = catalog.get_role(member_id).name().to_string();
4862 catalog.ensure_not_reserved_role(member_id)?;
4864 catalog.ensure_grantable_role(&role_id)?;
4865 session.add_notice(AdapterNotice::RoleMembershipAlreadyExists {
4866 role_name,
4867 member_name,
4868 });
4869 } else {
4870 ops.push(catalog::Op::GrantRole {
4871 role_id,
4872 member_id: *member_id,
4873 grantor_id,
4874 });
4875 }
4876 }
4877 }
4878
4879 if ops.is_empty() {
4880 return Ok(ExecuteResponse::GrantedRole);
4881 }
4882
4883 self.catalog_transact(Some(session), ops)
4884 .await
4885 .map(|_| ExecuteResponse::GrantedRole)
4886 }
4887
4888 #[instrument]
4889 pub(super) async fn sequence_revoke_role(
4890 &mut self,
4891 session: &Session,
4892 plan::RevokeRolePlan {
4893 role_ids,
4894 member_ids,
4895 grantor_id,
4896 }: plan::RevokeRolePlan,
4897 ) -> Result<ExecuteResponse, AdapterError> {
4898 let catalog = self.catalog();
4899 let mut ops = Vec::with_capacity(role_ids.len() * member_ids.len());
4900 for role_id in role_ids {
4901 for member_id in &member_ids {
4902 let member_membership: BTreeSet<_> =
4903 catalog.get_role(member_id).membership().keys().collect();
4904 if !member_membership.contains(&role_id) {
4905 let role_name = catalog.get_role(&role_id).name().to_string();
4906 let member_name = catalog.get_role(member_id).name().to_string();
4907 catalog.ensure_not_reserved_role(member_id)?;
4909 catalog.ensure_grantable_role(&role_id)?;
4910 session.add_notice(AdapterNotice::RoleMembershipDoesNotExists {
4911 role_name,
4912 member_name,
4913 });
4914 } else {
4915 ops.push(catalog::Op::RevokeRole {
4916 role_id,
4917 member_id: *member_id,
4918 grantor_id,
4919 });
4920 }
4921 }
4922 }
4923
4924 if ops.is_empty() {
4925 return Ok(ExecuteResponse::RevokedRole);
4926 }
4927
4928 self.catalog_transact(Some(session), ops)
4929 .await
4930 .map(|_| ExecuteResponse::RevokedRole)
4931 }
4932
4933 #[instrument]
4934 pub(super) async fn sequence_alter_owner(
4935 &mut self,
4936 session: &Session,
4937 plan::AlterOwnerPlan {
4938 id,
4939 object_type,
4940 new_owner,
4941 }: plan::AlterOwnerPlan,
4942 ) -> Result<ExecuteResponse, AdapterError> {
4943 let mut ops = vec![catalog::Op::UpdateOwner {
4944 id: id.clone(),
4945 new_owner,
4946 }];
4947
4948 match &id {
4949 ObjectId::Item(global_id) => {
4950 let entry = self.catalog().get_entry(global_id);
4951
4952 if entry.is_index() {
4954 let name = self
4955 .catalog()
4956 .resolve_full_name(entry.name(), Some(session.conn_id()))
4957 .to_string();
4958 session.add_notice(AdapterNotice::AlterIndexOwner { name });
4959 return Ok(ExecuteResponse::AlteredObject(object_type));
4960 }
4961
4962 let dependent_index_ops = entry
4964 .used_by()
4965 .into_iter()
4966 .filter(|id| self.catalog().get_entry(id).is_index())
4967 .map(|id| catalog::Op::UpdateOwner {
4968 id: ObjectId::Item(*id),
4969 new_owner,
4970 });
4971 ops.extend(dependent_index_ops);
4972
4973 let dependent_subsources =
4975 entry
4976 .progress_id()
4977 .into_iter()
4978 .map(|item_id| catalog::Op::UpdateOwner {
4979 id: ObjectId::Item(item_id),
4980 new_owner,
4981 });
4982 ops.extend(dependent_subsources);
4983 }
4984 ObjectId::Cluster(cluster_id) => {
4985 let cluster = self.catalog().get_cluster(*cluster_id);
4986 let managed_cluster_replica_ops =
4988 cluster.replicas().map(|replica| catalog::Op::UpdateOwner {
4989 id: ObjectId::ClusterReplica((cluster.id(), replica.replica_id())),
4990 new_owner,
4991 });
4992 ops.extend(managed_cluster_replica_ops);
4993 }
4994 _ => {}
4995 }
4996
4997 self.catalog_transact(Some(session), ops)
4998 .await
4999 .map(|_| ExecuteResponse::AlteredObject(object_type))
5000 }
5001
5002 #[instrument]
5003 pub(super) async fn sequence_reassign_owned(
5004 &mut self,
5005 session: &Session,
5006 plan::ReassignOwnedPlan {
5007 old_roles,
5008 new_role,
5009 reassign_ids,
5010 }: plan::ReassignOwnedPlan,
5011 ) -> Result<ExecuteResponse, AdapterError> {
5012 for role_id in old_roles.iter().chain(iter::once(&new_role)) {
5013 self.catalog().ensure_not_reserved_role(role_id)?;
5014 }
5015
5016 let ops = reassign_ids
5017 .into_iter()
5018 .map(|id| catalog::Op::UpdateOwner {
5019 id,
5020 new_owner: new_role,
5021 })
5022 .collect();
5023
5024 self.catalog_transact(Some(session), ops)
5025 .await
5026 .map(|_| ExecuteResponse::ReassignOwned)
5027 }
5028
5029 #[instrument]
5030 pub(crate) async fn handle_deferred_statement(&mut self) {
5031 let Some(DeferredPlanStatement { ctx, ps }) = self.serialized_ddl.pop_front() else {
5035 return;
5036 };
5037 match ps {
5038 crate::coord::PlanStatement::Statement { stmt, params } => {
5039 self.handle_execute_inner(stmt, params, ctx).await;
5040 }
5041 crate::coord::PlanStatement::Plan { plan, resolved_ids } => {
5042 self.sequence_plan(ctx, plan, resolved_ids).await;
5043 }
5044 }
5045 }
5046
5047 #[instrument]
5048 #[allow(clippy::unused_async)]
5050 pub(super) async fn sequence_alter_table(
5051 &mut self,
5052 ctx: &mut ExecuteContext,
5053 plan: plan::AlterTablePlan,
5054 ) -> Result<ExecuteResponse, AdapterError> {
5055 let plan::AlterTablePlan {
5056 relation_id,
5057 column_name,
5058 column_type,
5059 raw_sql_type,
5060 } = plan;
5061
5062 let id_ts = self.get_catalog_write_ts().await;
5064 let (_, new_global_id) = self.catalog.allocate_user_id(id_ts).await?;
5065 let ops = vec![catalog::Op::AlterAddColumn {
5066 id: relation_id,
5067 new_global_id,
5068 name: column_name,
5069 typ: column_type,
5070 sql: raw_sql_type,
5071 }];
5072
5073 let entry = self.catalog().get_entry(&relation_id);
5074 let CatalogItem::Table(table) = &entry.item else {
5075 let err = format!("expected table, found {:?}", entry.item);
5076 return Err(AdapterError::Internal(err));
5077 };
5078 let expected_version = table.desc.latest_version();
5080 let existing_global_id = table.global_id_writes();
5081
5082 self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
5083 Box::pin(async move {
5084 let entry = coord.catalog().get_entry(&relation_id);
5085 let CatalogItem::Table(table) = &entry.item else {
5086 panic!("programming error, expected table found {:?}", entry.item);
5087 };
5088 let table = table.clone();
5089
5090 let existing_table = crate::CollectionIdBundle {
5094 storage_ids: btreeset![existing_global_id],
5095 compute_ids: BTreeMap::new(),
5096 };
5097 let existing_table_read_hold = coord.acquire_read_holds(&existing_table);
5098
5099 let new_version = table.desc.latest_version();
5100 let new_desc = table
5101 .desc
5102 .at_version(RelationVersionSelector::Specific(new_version));
5103 let register_ts = coord.get_local_write_ts().await.timestamp;
5104
5105 coord
5107 .controller
5108 .storage
5109 .alter_table_desc(
5110 existing_global_id,
5111 new_global_id,
5112 new_desc,
5113 expected_version,
5114 register_ts,
5115 )
5116 .await
5117 .expect("failed to alter desc of table");
5118
5119 let compaction_window = table
5121 .custom_logical_compaction_window
5122 .unwrap_or(CompactionWindow::Default);
5123 coord
5124 .initialize_read_policies(
5125 &crate::CollectionIdBundle {
5126 storage_ids: btreeset![new_global_id],
5127 compute_ids: BTreeMap::new(),
5128 },
5129 compaction_window,
5130 )
5131 .await;
5132 coord.apply_local_write(register_ts).await;
5133
5134 drop(existing_table_read_hold);
5136 })
5137 })
5138 .await?;
5139
5140 Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
5141 }
5142}
5143
5144#[derive(Debug)]
5145struct CachedStatisticsOracle {
5146 cache: BTreeMap<GlobalId, usize>,
5147}
5148
5149impl CachedStatisticsOracle {
5150 pub async fn new<T: TimelyTimestamp>(
5151 ids: &BTreeSet<GlobalId>,
5152 as_of: &Antichain<T>,
5153 storage_collections: &dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T>,
5154 ) -> Result<Self, StorageError<T>> {
5155 let mut cache = BTreeMap::new();
5156
5157 for id in ids {
5158 let stats = storage_collections.snapshot_stats(*id, as_of.clone()).await;
5159
5160 match stats {
5161 Ok(stats) => {
5162 cache.insert(*id, stats.num_updates);
5163 }
5164 Err(StorageError::IdentifierMissing(id)) => {
5165 ::tracing::debug!("no statistics for {id}")
5166 }
5167 Err(e) => return Err(e),
5168 }
5169 }
5170
5171 Ok(Self { cache })
5172 }
5173}
5174
5175impl mz_transform::StatisticsOracle for CachedStatisticsOracle {
5176 fn cardinality_estimate(&self, id: GlobalId) -> Option<usize> {
5177 self.cache.get(&id).map(|estimate| *estimate)
5178 }
5179
5180 fn as_map(&self) -> BTreeMap<GlobalId, usize> {
5181 self.cache.clone()
5182 }
5183}
5184
5185impl Coordinator {
5186 pub(super) async fn statistics_oracle(
5187 &self,
5188 session: &Session,
5189 source_ids: &BTreeSet<GlobalId>,
5190 query_as_of: &Antichain<Timestamp>,
5191 is_oneshot: bool,
5192 ) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
5193 if !session.vars().enable_session_cardinality_estimates() {
5194 return Ok(Box::new(EmptyStatisticsOracle));
5195 }
5196
5197 let timeout = if is_oneshot {
5198 self.catalog()
5200 .system_config()
5201 .optimizer_oneshot_stats_timeout()
5202 } else {
5203 self.catalog().system_config().optimizer_stats_timeout()
5204 };
5205
5206 let cached_stats = mz_ore::future::timeout(
5207 timeout,
5208 CachedStatisticsOracle::new(
5209 source_ids,
5210 query_as_of,
5211 self.controller.storage_collections.as_ref(),
5212 ),
5213 )
5214 .await;
5215
5216 match cached_stats {
5217 Ok(stats) => Ok(Box::new(stats)),
5218 Err(mz_ore::future::TimeoutError::DeadlineElapsed) => {
5219 warn!(
5220 is_oneshot = is_oneshot,
5221 "optimizer statistics collection timed out after {}ms",
5222 timeout.as_millis()
5223 );
5224
5225 Ok(Box::new(EmptyStatisticsOracle))
5226 }
5227 Err(mz_ore::future::TimeoutError::Inner(e)) => Err(AdapterError::Storage(e)),
5228 }
5229 }
5230}
5231
5232pub(super) fn check_log_reads(
5241 catalog: &Catalog,
5242 cluster: &Cluster,
5243 source_ids: &BTreeSet<GlobalId>,
5244 target_replica: &mut Option<ReplicaId>,
5245 vars: &SessionVars,
5246) -> Result<impl IntoIterator<Item = AdapterNotice>, AdapterError>
5247where
5248{
5249 let log_names = source_ids
5250 .iter()
5251 .map(|gid| catalog.resolve_item_id(gid))
5252 .flat_map(|item_id| catalog.introspection_dependencies(item_id))
5253 .map(|item_id| catalog.get_entry(&item_id).name().item.clone())
5254 .collect::<Vec<_>>();
5255
5256 if log_names.is_empty() {
5257 return Ok(None);
5258 }
5259
5260 let num_replicas = cluster.replicas().count();
5264 if target_replica.is_none() {
5265 if num_replicas == 1 {
5266 *target_replica = cluster.replicas().map(|r| r.replica_id).next();
5267 } else {
5268 return Err(AdapterError::UntargetedLogRead { log_names });
5269 }
5270 }
5271
5272 let replica_id = target_replica.expect("set to `Some` above");
5275 let replica = &cluster.replica(replica_id).expect("Replica must exist");
5276 if !replica.config.compute.logging.enabled() {
5277 return Err(AdapterError::IntrospectionDisabled { log_names });
5278 }
5279
5280 Ok(vars
5281 .emit_introspection_query_notice()
5282 .then_some(AdapterNotice::PerReplicaLogRead { log_names }))
5283}
5284
5285impl Coordinator {
5286 fn emit_optimizer_notices(&self, session: &Session, notices: &Vec<RawOptimizerNotice>) {
5288 if notices.is_empty() {
5290 return;
5291 }
5292 let humanizer = self.catalog.for_session(session);
5293 let system_vars = self.catalog.system_config();
5294 for notice in notices {
5295 let kind = OptimizerNoticeKind::from(notice);
5296 let notice_enabled = match kind {
5297 OptimizerNoticeKind::IndexAlreadyExists => {
5298 system_vars.enable_notices_for_index_already_exists()
5299 }
5300 OptimizerNoticeKind::IndexTooWideForLiteralConstraints => {
5301 system_vars.enable_notices_for_index_too_wide_for_literal_constraints()
5302 }
5303 OptimizerNoticeKind::IndexKeyEmpty => {
5304 system_vars.enable_notices_for_index_empty_key()
5305 }
5306 };
5307 if notice_enabled {
5308 session.add_notice(AdapterNotice::OptimizerNotice {
5312 notice: notice.message(&humanizer, false).to_string(),
5313 hint: notice.hint(&humanizer, false).to_string(),
5314 });
5315 }
5316 self.metrics
5317 .optimization_notices
5318 .with_label_values(&[kind.metric_label()])
5319 .inc_by(1);
5320 }
5321 }
5322
5323 async fn process_dataflow_metainfo(
5325 &mut self,
5326 df_meta: DataflowMetainfo,
5327 export_id: GlobalId,
5328 ctx: Option<&mut ExecuteContext>,
5329 notice_ids: Vec<GlobalId>,
5330 ) -> Option<BuiltinTableAppendNotify> {
5331 if let Some(ctx) = ctx {
5333 self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
5334 }
5335
5336 let df_meta = self
5338 .catalog()
5339 .render_notices(df_meta, notice_ids, Some(export_id));
5340
5341 if self.catalog().state().system_config().enable_mz_notices()
5344 && !df_meta.optimizer_notices.is_empty()
5345 {
5346 let mut builtin_table_updates = Vec::with_capacity(df_meta.optimizer_notices.len());
5347 self.catalog().state().pack_optimizer_notices(
5348 &mut builtin_table_updates,
5349 df_meta.optimizer_notices.iter(),
5350 Diff::ONE,
5351 );
5352
5353 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
5355
5356 Some(
5357 self.builtin_table_update()
5358 .execute(builtin_table_updates)
5359 .await
5360 .0,
5361 )
5362 } else {
5363 self.catalog_mut().set_dataflow_metainfo(export_id, df_meta);
5365
5366 None
5367 }
5368 }
5369}