1#![allow(clippy::pub_use)]
12
13use std::collections::{BTreeMap, BTreeSet};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use futures::FutureExt;
20use futures::future::LocalBoxFuture;
21use futures::stream::FuturesOrdered;
22use http::Uri;
23use inner::return_if_err;
24use maplit::btreemap;
25use mz_catalog::memory::objects::Cluster;
26use mz_controller_types::ReplicaId;
27use mz_expr::row::RowCollection;
28use mz_expr::{MapFilterProject, MirRelationExpr, ResultSpec, RowSetFinishing};
29use mz_ore::cast::CastFrom;
30use mz_ore::tracing::OpenTelemetryContext;
31use mz_persist_client::stats::SnapshotPartStats;
32use mz_repr::explain::{ExprHumanizerExt, TransientItem};
33use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, Row, RowArena, Timestamp};
34use mz_sql::catalog::{CatalogError, SessionCatalog};
35use mz_sql::names::ResolvedIds;
36use mz_sql::plan::{
37 self, AbortTransactionPlan, CommitTransactionPlan, CopyFromSource, CreateRolePlan,
38 CreateSourcePlanBundle, FetchPlan, HirScalarExpr, MutationKind, Params, Plan, PlanKind,
39 RaisePlan,
40};
41use mz_sql::rbac;
42use mz_sql::session::metadata::SessionMetadata;
43use mz_sql::session::vars;
44use mz_sql::session::vars::SessionVars;
45use mz_sql_parser::ast::{Raw, Statement};
46use mz_storage_client::client::TableData;
47use mz_storage_client::storage_collections::StorageCollections;
48use mz_storage_types::connections::inline::IntoInlineConnection;
49use mz_storage_types::controller::StorageError;
50use mz_storage_types::stats::RelationPartStats;
51use mz_transform::dataflow::DataflowMetainfo;
52use mz_transform::notice::{OptimizerNoticeApi, OptimizerNoticeKind, RawOptimizerNotice};
53use mz_transform::{EmptyStatisticsOracle, StatisticsOracle};
54use timely::progress::Antichain;
55use tokio::sync::oneshot;
56use tracing::{Instrument, Level, Span, event, warn};
57
58use crate::ExecuteContext;
59use crate::catalog::{Catalog, CatalogState};
60use crate::command::{Command, ExecuteResponse, Response};
61use crate::coord::appends::{DeferredOp, DeferredPlan};
62use crate::coord::validity::PlanValidity;
63use crate::coord::{
64 Coordinator, DeferredPlanStatement, ExplainPlanContext, Message, PlanStatement, TargetCluster,
65 catalog_serving,
66};
67use crate::error::AdapterError;
68use crate::explain::insights::PlanInsightsContext;
69use crate::notice::AdapterNotice;
70use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
71use crate::optimize::peek;
72use crate::session::{
73 EndTransactionAction, Session, StateRevision, TransactionOps, TransactionStatus, WriteOp,
74};
75use crate::util::ClientTransmitter;
76
77mod inner;
97
98impl Coordinator {
99 pub(crate) fn sequence_plan(
103 &mut self,
104 mut ctx: ExecuteContext,
105 plan: Plan,
106 resolved_ids: ResolvedIds,
107 ) -> LocalBoxFuture<'_, ()> {
108 async move {
109 let responses = ExecuteResponse::generated_from(&PlanKind::from(&plan));
110 ctx.tx_mut().set_allowed(responses);
111
112 if self.controller.read_only() && !plan.allowed_in_read_only() {
113 ctx.retire(Err(AdapterError::ReadOnly));
114 return;
115 }
116
117 if let Some((dependencies, wait_future)) =
120 super::appends::waiting_on_startup_appends(self.catalog(), ctx.session_mut(), &plan)
121 {
122 let conn_id = ctx.session().conn_id();
123 tracing::debug!(%conn_id, "deferring plan for startup appends");
124
125 let role_metadata = ctx.session().role_metadata().clone();
126 let validity = PlanValidity::new(
127 self.catalog.transient_revision(),
128 dependencies,
129 None,
130 None,
131 role_metadata,
132 );
133 let deferred_plan = DeferredPlan {
134 ctx,
135 plan,
136 validity,
137 requires_locks: BTreeSet::default(),
138 };
139 let acquire_future = wait_future.map(|()| None);
142
143 self.defer_op(acquire_future, DeferredOp::Plan(deferred_plan));
144
145 return;
148 };
149
150 let target_cluster = match ctx.session().transaction().cluster() {
152 Some(cluster_id) => TargetCluster::Transaction(cluster_id),
154 None => {
156 let session_catalog = self.catalog.for_session(ctx.session());
157 catalog_serving::auto_run_on_catalog_server(
158 &session_catalog,
159 ctx.session(),
160 &plan,
161 )
162 }
163 };
164 let (target_cluster_id, target_cluster_name) = match self
165 .catalog()
166 .resolve_target_cluster(target_cluster, ctx.session())
167 {
168 Ok(cluster) => (Some(cluster.id), Some(cluster.name.clone())),
169 Err(_) => (None, None),
170 };
171
172 if let (Some(cluster_id), Some(statement_id)) =
173 (target_cluster_id, ctx.extra().contents())
174 {
175 self.set_statement_execution_cluster(statement_id, cluster_id);
176 }
177
178 let session_catalog = self.catalog.for_session(ctx.session());
179
180 if let Some(cluster_name) = &target_cluster_name {
181 if let Err(e) = catalog_serving::check_cluster_restrictions(
182 cluster_name,
183 &session_catalog,
184 &plan,
185 ) {
186 return ctx.retire(Err(e));
187 }
188 }
189
190 if let Err(e) = rbac::check_plan(
191 &session_catalog,
192 Some(|id| {
193 self.active_conns()
196 .into_iter()
197 .find(|(conn_id, _)| conn_id.unhandled() == id)
198 .map(|(_, conn_meta)| *conn_meta.authenticated_role_id())
199 }),
200 ctx.session(),
201 &plan,
202 target_cluster_id,
203 &resolved_ids,
204 ) {
205 return ctx.retire(Err(e.into()));
206 }
207
208 match plan {
209 Plan::CreateSource(plan) => {
210 let (item_id, global_id) = return_if_err!(self.allocate_user_id().await, ctx);
211 let result = self
212 .sequence_create_source(
213 &mut ctx,
214 vec![CreateSourcePlanBundle {
215 item_id,
216 global_id,
217 plan,
218 resolved_ids,
219 available_source_references: None,
220 }],
221 )
222 .await;
223 ctx.retire(result);
224 }
225 Plan::CreateSources(plans) => {
226 assert!(
227 resolved_ids.is_empty(),
228 "each plan has separate resolved_ids"
229 );
230 let result = self.sequence_create_source(&mut ctx, plans).await;
231 ctx.retire(result);
232 }
233 Plan::CreateConnection(plan) => {
234 self.sequence_create_connection(ctx, plan, resolved_ids)
235 .await;
236 }
237 Plan::CreateDatabase(plan) => {
238 let result = self.sequence_create_database(ctx.session_mut(), plan).await;
239 ctx.retire(result);
240 }
241 Plan::CreateSchema(plan) => {
242 let result = self.sequence_create_schema(ctx.session_mut(), plan).await;
243 ctx.retire(result);
244 }
245 Plan::CreateRole(plan) => {
246 let result = self
247 .sequence_create_role(Some(ctx.session().conn_id()), plan)
248 .await;
249 if let Some(notice) = self.should_emit_rbac_notice(ctx.session()) {
250 ctx.session().add_notice(notice);
251 }
252 ctx.retire(result);
253 }
254 Plan::CreateCluster(plan) => {
255 let result = self.sequence_create_cluster(ctx.session(), plan).await;
256 ctx.retire(result);
257 }
258 Plan::CreateClusterReplica(plan) => {
259 let result = self
260 .sequence_create_cluster_replica(ctx.session(), plan)
261 .await;
262 ctx.retire(result);
263 }
264 Plan::CreateTable(plan) => {
265 let result = self
266 .sequence_create_table(&mut ctx, plan, resolved_ids)
267 .await;
268 ctx.retire(result);
269 }
270 Plan::CreateSecret(plan) => {
271 self.sequence_create_secret(ctx, plan).await;
272 }
273 Plan::CreateSink(plan) => {
274 self.sequence_create_sink(ctx, plan, resolved_ids).await;
275 }
276 Plan::CreateView(plan) => {
277 self.sequence_create_view(ctx, plan, resolved_ids).await;
278 }
279 Plan::CreateMaterializedView(plan) => {
280 self.sequence_create_materialized_view(ctx, plan, resolved_ids)
281 .await;
282 }
283 Plan::CreateContinualTask(plan) => {
284 let res = self
285 .sequence_create_continual_task(&mut ctx, plan, resolved_ids)
286 .await;
287 ctx.retire(res);
288 }
289 Plan::CreateIndex(plan) => {
290 self.sequence_create_index(ctx, plan, resolved_ids).await;
291 }
292 Plan::CreateType(plan) => {
293 let result = self
294 .sequence_create_type(ctx.session(), plan, resolved_ids)
295 .await;
296 ctx.retire(result);
297 }
298 Plan::CreateNetworkPolicy(plan) => {
299 let res = self
300 .sequence_create_network_policy(ctx.session(), plan)
301 .await;
302 ctx.retire(res);
303 }
304 Plan::Comment(plan) => {
305 let result = self.sequence_comment_on(ctx.session(), plan).await;
306 ctx.retire(result);
307 }
308 Plan::CopyTo(plan) => {
309 self.sequence_copy_to(ctx, plan, target_cluster).await;
310 }
311 Plan::DropObjects(plan) => {
312 let result = self.sequence_drop_objects(&mut ctx, plan).await;
313 ctx.retire(result);
314 }
315 Plan::DropOwned(plan) => {
316 let result = self.sequence_drop_owned(ctx.session_mut(), plan).await;
317 ctx.retire(result);
318 }
319 Plan::EmptyQuery => {
320 ctx.retire(Ok(ExecuteResponse::EmptyQuery));
321 }
322 Plan::ShowAllVariables => {
323 let result = self.sequence_show_all_variables(ctx.session());
324 ctx.retire(result);
325 }
326 Plan::ShowVariable(plan) => {
327 let result = self.sequence_show_variable(ctx.session(), plan);
328 ctx.retire(result);
329 }
330 Plan::InspectShard(plan) => {
331 let result = self.sequence_inspect_shard(ctx.session(), plan).await;
333 ctx.retire(result);
334 }
335 Plan::SetVariable(plan) => {
336 let result = self.sequence_set_variable(ctx.session_mut(), plan);
337 ctx.retire(result);
338 }
339 Plan::ResetVariable(plan) => {
340 let result = self.sequence_reset_variable(ctx.session_mut(), plan);
341 ctx.retire(result);
342 }
343 Plan::SetTransaction(plan) => {
344 let result = self.sequence_set_transaction(ctx.session_mut(), plan);
345 ctx.retire(result);
346 }
347 Plan::StartTransaction(plan) => {
348 if matches!(
349 ctx.session().transaction(),
350 TransactionStatus::InTransaction(_)
351 ) {
352 ctx.session()
353 .add_notice(AdapterNotice::ExistingTransactionInProgress);
354 }
355 let result = ctx.session_mut().start_transaction(
356 self.now_datetime(),
357 plan.access,
358 plan.isolation_level,
359 );
360 ctx.retire(result.map(|_| ExecuteResponse::StartedTransaction))
361 }
362 Plan::CommitTransaction(CommitTransactionPlan {
363 ref transaction_type,
364 })
365 | Plan::AbortTransaction(AbortTransactionPlan {
366 ref transaction_type,
367 }) => {
368 if ctx.session().transaction().is_ddl() {
371 if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
372 let prev = self
373 .active_conns
374 .get_mut(ctx.session().conn_id())
375 .expect("connection must exist")
376 .deferred_lock
377 .replace(guard);
378 assert!(
379 prev.is_none(),
380 "connections should have at most one lock guard"
381 );
382 } else {
383 self.serialized_ddl.push_back(DeferredPlanStatement {
384 ctx,
385 ps: PlanStatement::Plan { plan, resolved_ids },
386 });
387 return;
388 }
389 }
390
391 let action = match &plan {
392 Plan::CommitTransaction(_) => EndTransactionAction::Commit,
393 Plan::AbortTransaction(_) => EndTransactionAction::Rollback,
394 _ => unreachable!(),
395 };
396 if ctx.session().transaction().is_implicit() && !transaction_type.is_implicit()
397 {
398 ctx.session().add_notice(
403 AdapterNotice::ExplicitTransactionControlInImplicitTransaction,
404 );
405 }
406 self.sequence_end_transaction(ctx, action).await;
407 }
408 Plan::Select(plan) => {
409 let max = Some(ctx.session().vars().max_query_result_size());
410 self.sequence_peek(ctx, plan, target_cluster, max).await;
411 }
412 Plan::Subscribe(plan) => {
413 self.sequence_subscribe(ctx, plan, target_cluster).await;
414 }
415 Plan::SideEffectingFunc(plan) => {
416 self.sequence_side_effecting_func(ctx, plan).await;
417 }
418 Plan::ShowCreate(plan) => {
419 ctx.retire(Ok(Self::send_immediate_rows(plan.row)));
420 }
421 Plan::ShowColumns(show_columns_plan) => {
422 let max = Some(ctx.session().vars().max_query_result_size());
423 self.sequence_peek(ctx, show_columns_plan.select_plan, target_cluster, max)
424 .await;
425 }
426 Plan::CopyFrom(plan) => match plan.source {
427 CopyFromSource::Stdin => {
428 let (tx, _, session, ctx_extra) = ctx.into_parts();
429 tx.send(
430 Ok(ExecuteResponse::CopyFrom {
431 target_id: plan.target_id,
432 target_name: plan.target_name,
433 columns: plan.columns,
434 params: plan.params,
435 ctx_extra,
436 }),
437 session,
438 );
439 }
440 CopyFromSource::Url(_) | CopyFromSource::AwsS3 { .. } => {
441 self.sequence_copy_from(ctx, plan, target_cluster).await;
442 }
443 },
444 Plan::ExplainPlan(plan) => {
445 self.sequence_explain_plan(ctx, plan, target_cluster).await;
446 }
447 Plan::ExplainPushdown(plan) => {
448 self.sequence_explain_pushdown(ctx, plan, target_cluster)
449 .await;
450 }
451 Plan::ExplainSinkSchema(plan) => {
452 let result = self.sequence_explain_schema(plan);
453 ctx.retire(result);
454 }
455 Plan::ExplainTimestamp(plan) => {
456 self.sequence_explain_timestamp(ctx, plan, target_cluster)
457 .await;
458 }
459 Plan::Insert(plan) => {
460 self.sequence_insert(ctx, plan).await;
461 }
462 Plan::ReadThenWrite(plan) => {
463 self.sequence_read_then_write(ctx, plan).await;
464 }
465 Plan::AlterNoop(plan) => {
466 ctx.retire(Ok(ExecuteResponse::AlteredObject(plan.object_type)));
467 }
468 Plan::AlterCluster(plan) => {
469 self.sequence_alter_cluster_staged(ctx, plan).await;
470 }
471 Plan::AlterClusterRename(plan) => {
472 let result = self.sequence_alter_cluster_rename(&mut ctx, plan).await;
473 ctx.retire(result);
474 }
475 Plan::AlterClusterSwap(plan) => {
476 let result = self.sequence_alter_cluster_swap(&mut ctx, plan).await;
477 ctx.retire(result);
478 }
479 Plan::AlterClusterReplicaRename(plan) => {
480 let result = self
481 .sequence_alter_cluster_replica_rename(ctx.session(), plan)
482 .await;
483 ctx.retire(result);
484 }
485 Plan::AlterConnection(plan) => {
486 self.sequence_alter_connection(ctx, plan).await;
487 }
488 Plan::AlterSetCluster(plan) => {
489 let result = self.sequence_alter_set_cluster(ctx.session(), plan).await;
490 ctx.retire(result);
491 }
492 Plan::AlterRetainHistory(plan) => {
493 let result = self.sequence_alter_retain_history(&mut ctx, plan).await;
494 ctx.retire(result);
495 }
496 Plan::AlterSourceTimestampInterval(plan) => {
497 let result = self
498 .sequence_alter_source_timestamp_interval(&mut ctx, plan)
499 .await;
500 ctx.retire(result);
501 }
502 Plan::AlterItemRename(plan) => {
503 let result = self.sequence_alter_item_rename(&mut ctx, plan).await;
504 ctx.retire(result);
505 }
506 Plan::AlterSchemaRename(plan) => {
507 let result = self.sequence_alter_schema_rename(&mut ctx, plan).await;
508 ctx.retire(result);
509 }
510 Plan::AlterSchemaSwap(plan) => {
511 let result = self.sequence_alter_schema_swap(&mut ctx, plan).await;
512 ctx.retire(result);
513 }
514 Plan::AlterRole(plan) => {
515 let result = self.sequence_alter_role(ctx.session_mut(), plan).await;
516 ctx.retire(result);
517 }
518 Plan::AlterSecret(plan) => {
519 self.sequence_alter_secret(ctx, plan).await;
520 }
521 Plan::AlterSink(plan) => {
522 self.sequence_alter_sink_prepare(ctx, plan).await;
523 }
524 Plan::AlterSource(plan) => {
525 let result = self.sequence_alter_source(ctx.session_mut(), plan).await;
526 ctx.retire(result);
527 }
528 Plan::AlterSystemSet(plan) => {
529 let result = self.sequence_alter_system_set(ctx.session(), plan).await;
530 ctx.retire(result);
531 }
532 Plan::AlterSystemReset(plan) => {
533 let result = self.sequence_alter_system_reset(ctx.session(), plan).await;
534 ctx.retire(result);
535 }
536 Plan::AlterSystemResetAll(plan) => {
537 let result = self
538 .sequence_alter_system_reset_all(ctx.session(), plan)
539 .await;
540 ctx.retire(result);
541 }
542 Plan::AlterTableAddColumn(plan) => {
543 let result = self.sequence_alter_table(&mut ctx, plan).await;
544 ctx.retire(result);
545 }
546 Plan::AlterMaterializedViewApplyReplacement(plan) => {
547 self.sequence_alter_materialized_view_apply_replacement_prepare(ctx, plan)
548 .await;
549 }
550 Plan::AlterNetworkPolicy(plan) => {
551 let res = self
552 .sequence_alter_network_policy(ctx.session(), plan)
553 .await;
554 ctx.retire(res);
555 }
556 Plan::DiscardTemp => {
557 self.drop_temp_items(ctx.session().conn_id()).await;
558 ctx.retire(Ok(ExecuteResponse::DiscardedTemp));
559 }
560 Plan::DiscardAll => {
561 let ret = if let TransactionStatus::Started(_) = ctx.session().transaction() {
562 self.clear_transaction(ctx.session_mut()).await;
563 self.drop_temp_items(ctx.session().conn_id()).await;
564 ctx.session_mut().reset();
565 Ok(ExecuteResponse::DiscardedAll)
566 } else {
567 Err(AdapterError::OperationProhibitsTransaction(
568 "DISCARD ALL".into(),
569 ))
570 };
571 ctx.retire(ret);
572 }
573 Plan::Declare(plan) => {
574 self.declare(ctx, plan.name, plan.stmt, plan.sql, plan.params);
575 }
576 Plan::Fetch(FetchPlan {
577 name,
578 count,
579 timeout,
580 }) => {
581 let ctx_extra = std::mem::take(ctx.extra_mut());
582 ctx.retire(Ok(ExecuteResponse::Fetch {
583 name,
584 count,
585 timeout,
586 ctx_extra,
587 }));
588 }
589 Plan::Close(plan) => {
590 if ctx.session_mut().remove_portal(&plan.name) {
591 ctx.retire(Ok(ExecuteResponse::ClosedCursor));
592 } else {
593 ctx.retire(Err(AdapterError::UnknownCursor(plan.name)));
594 }
595 }
596 Plan::Prepare(plan) => {
597 if ctx
598 .session()
599 .get_prepared_statement_unverified(&plan.name)
600 .is_some()
601 {
602 ctx.retire(Err(AdapterError::PreparedStatementExists(plan.name)));
603 } else {
604 let state_revision = StateRevision {
605 catalog_revision: self.catalog().transient_revision(),
606 session_state_revision: ctx.session().state_revision(),
607 };
608 ctx.session_mut().set_prepared_statement(
609 plan.name,
610 Some(plan.stmt),
611 plan.sql,
612 plan.desc,
613 state_revision,
614 self.now(),
615 );
616 ctx.retire(Ok(ExecuteResponse::Prepare));
617 }
618 }
619 Plan::Execute(plan) => {
620 match self.sequence_execute(ctx.session_mut(), plan) {
621 Ok(portal_name) => {
622 let (tx, _, session, extra) = ctx.into_parts();
623 self.internal_cmd_tx
624 .send(Message::Command(
625 OpenTelemetryContext::obtain(),
626 Command::Execute {
627 portal_name,
628 session,
629 tx: tx.take(),
630 outer_ctx_extra: Some(extra),
631 },
632 ))
633 .expect("sending to self.internal_cmd_tx cannot fail");
634 }
635 Err(err) => ctx.retire(Err(err)),
636 };
637 }
638 Plan::Deallocate(plan) => match plan.name {
639 Some(name) => {
640 if ctx.session_mut().remove_prepared_statement(&name) {
641 ctx.retire(Ok(ExecuteResponse::Deallocate { all: false }));
642 } else {
643 ctx.retire(Err(AdapterError::UnknownPreparedStatement(name)));
644 }
645 }
646 None => {
647 ctx.session_mut().remove_all_prepared_statements();
648 ctx.retire(Ok(ExecuteResponse::Deallocate { all: true }));
649 }
650 },
651 Plan::Raise(RaisePlan { severity }) => {
652 ctx.session()
653 .add_notice(AdapterNotice::UserRequested { severity });
654 ctx.retire(Ok(ExecuteResponse::Raised));
655 }
656 Plan::GrantPrivileges(plan) => {
657 let result = self
658 .sequence_grant_privileges(ctx.session_mut(), plan)
659 .await;
660 ctx.retire(result);
661 }
662 Plan::RevokePrivileges(plan) => {
663 let result = self
664 .sequence_revoke_privileges(ctx.session_mut(), plan)
665 .await;
666 ctx.retire(result);
667 }
668 Plan::AlterDefaultPrivileges(plan) => {
669 let result = self
670 .sequence_alter_default_privileges(ctx.session_mut(), plan)
671 .await;
672 ctx.retire(result);
673 }
674 Plan::GrantRole(plan) => {
675 let result = self.sequence_grant_role(ctx.session_mut(), plan).await;
676 ctx.retire(result);
677 }
678 Plan::RevokeRole(plan) => {
679 let result = self.sequence_revoke_role(ctx.session_mut(), plan).await;
680 ctx.retire(result);
681 }
682 Plan::AlterOwner(plan) => {
683 let result = self.sequence_alter_owner(ctx.session_mut(), plan).await;
684 ctx.retire(result);
685 }
686 Plan::ReassignOwned(plan) => {
687 let result = self.sequence_reassign_owned(ctx.session_mut(), plan).await;
688 ctx.retire(result);
689 }
690 Plan::ValidateConnection(plan) => {
691 let connection = plan
692 .connection
693 .into_inline_connection(self.catalog().state());
694 let current_storage_configuration = self.controller.storage.config().clone();
695 mz_ore::task::spawn(|| "coord::validate_connection", async move {
696 let res = match connection
697 .validate(plan.id, ¤t_storage_configuration)
698 .await
699 {
700 Ok(()) => Ok(ExecuteResponse::ValidatedConnection),
701 Err(err) => Err(err.into()),
702 };
703 ctx.retire(res);
704 });
705 }
706 }
707 }
708 .instrument(tracing::debug_span!("coord::sequencer::sequence_plan"))
709 .boxed_local()
710 }
711
712 #[mz_ore::instrument(level = "debug")]
713 pub(crate) async fn sequence_execute_single_statement_transaction(
714 &mut self,
715 ctx: ExecuteContext,
716 stmt: Arc<Statement<Raw>>,
717 params: Params,
718 ) {
719 let (tx, internal_cmd_tx, mut session, extra) = ctx.into_parts();
721 assert!(matches!(session.transaction(), TransactionStatus::Default));
722 session.start_transaction_single_stmt(self.now_datetime());
723 let conn_id = session.conn_id().unhandled();
724
725 let (sub_tx, sub_rx) = oneshot::channel();
727 let sub_ct = ClientTransmitter::new(sub_tx, self.internal_cmd_tx.clone());
728 let sub_ctx = ExecuteContext::from_parts(sub_ct, internal_cmd_tx, session, extra);
729 self.handle_execute_inner(stmt, params, sub_ctx).await;
730
731 let internal_cmd_tx = self.internal_cmd_tx.clone();
734 mz_ore::task::spawn(
735 || format!("execute_single_statement:{conn_id}"),
736 async move {
737 let Ok(Response {
738 result,
739 session,
740 otel_ctx,
741 }) = sub_rx.await
742 else {
743 return;
745 };
746 otel_ctx.attach_as_parent();
747 let (sub_tx, sub_rx) = oneshot::channel();
748 let _ = internal_cmd_tx.send(Message::Command(
749 otel_ctx,
750 Command::Commit {
751 action: EndTransactionAction::Commit,
752 session,
753 tx: sub_tx,
754 },
755 ));
756 let Ok(commit_response) = sub_rx.await else {
757 return;
759 };
760 assert!(matches!(
761 commit_response.session.transaction(),
762 TransactionStatus::Default
763 ));
764 let result = match (result, commit_response.result) {
770 (Ok(_), commit) => commit,
771 (Err(result), _) => Err(result),
772 };
773 tx.send(result, commit_response.session);
776 }
777 .instrument(Span::current()),
778 );
779 }
780
781 #[mz_ore::instrument(level = "debug")]
785 pub(crate) async fn sequence_create_role_for_startup(
786 &mut self,
787 plan: CreateRolePlan,
788 ) -> Result<ExecuteResponse, AdapterError> {
789 self.sequence_create_role(None, plan).await
796 }
797
798 pub(crate) fn allocate_transient_id(&self) -> (CatalogItemId, GlobalId) {
799 self.transient_id_gen.allocate_id()
800 }
801
802 fn should_emit_rbac_notice(&self, session: &Session) -> Option<AdapterNotice> {
803 if !rbac::is_rbac_enabled_for_session(self.catalog.system_config(), session) {
804 Some(AdapterNotice::RbacUserDisabled)
805 } else {
806 None
807 }
808 }
809
810 pub(crate) fn insert_constant(
817 catalog: &Catalog,
818 session: &mut Session,
819 target_id: CatalogItemId,
820 constants: MirRelationExpr,
821 ) -> Result<ExecuteResponse, AdapterError> {
822 let desc = match catalog.try_get_entry(&target_id) {
824 Some(table) => {
825 table.relation_desc_latest().expect("table has desc")
827 }
828 None => {
829 return Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
830 kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
831 target_id.to_string(),
832 )),
833 }));
834 }
835 };
836
837 match constants.as_const() {
838 Some((rows, ..)) => {
839 let rows = rows.clone()?;
840 for (row, _) in &rows {
841 for (i, datum) in row.iter().enumerate() {
842 desc.constraints_met(i, &datum)?;
843 }
844 }
845 let diffs_plan = plan::SendDiffsPlan {
846 id: target_id,
847 updates: rows,
848 kind: MutationKind::Insert,
849 returning: Vec::new(),
850 max_result_size: catalog.system_config().max_result_size(),
851 };
852 Self::send_diffs(session, diffs_plan)
853 }
854 None => panic!(
855 "tried using sequence_insert_constant on non-constant MirRelationExpr\n{}",
856 constants.pretty(),
857 ),
858 }
859 }
860
861 #[mz_ore::instrument(level = "debug")]
862 pub(crate) fn send_diffs(
863 session: &mut Session,
864 mut plan: plan::SendDiffsPlan,
865 ) -> Result<ExecuteResponse, AdapterError> {
866 let affected_rows = {
867 let mut affected_rows = Diff::from(0);
868 let mut all_positive_diffs = true;
869 for (_, diff) in plan.updates.iter() {
872 if diff.is_negative() {
873 all_positive_diffs = false;
874 break;
875 }
876
877 affected_rows += diff;
878 }
879
880 if !all_positive_diffs {
881 differential_dataflow::consolidation::consolidate(&mut plan.updates);
885
886 affected_rows = Diff::ZERO;
887 for (_, diff) in plan.updates.iter() {
892 affected_rows += diff.abs();
893 }
894 }
895
896 usize::try_from(affected_rows.into_inner()).expect("positive Diff must fit")
897 };
898 event!(
899 Level::TRACE,
900 affected_rows,
901 id = format!("{:?}", plan.id),
902 kind = format!("{:?}", plan.kind),
903 updates = plan.updates.len(),
904 returning = plan.returning.len(),
905 );
906
907 session.add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
908 id: plan.id,
909 rows: TableData::Rows(plan.updates),
910 }]))?;
911 if !plan.returning.is_empty() {
912 let finishing = RowSetFinishing {
913 order_by: Vec::new(),
914 limit: None,
915 offset: 0,
916 project: (0..plan.returning[0].0.iter().count()).collect(),
917 };
918 let max_returned_query_size = session.vars().max_query_result_size();
919 let duration_histogram = session.metrics().row_set_finishing_seconds();
920
921 return match finishing.finish(
922 RowCollection::new(plan.returning, &finishing.order_by),
923 plan.max_result_size,
924 Some(max_returned_query_size),
925 duration_histogram,
926 ) {
927 Ok((rows, _size_bytes)) => Ok(Self::send_immediate_rows(rows)),
928 Err(e) => Err(AdapterError::ResultSize(e)),
929 };
930 }
931 Ok(match plan.kind {
932 MutationKind::Delete => ExecuteResponse::Deleted(affected_rows),
933 MutationKind::Insert => ExecuteResponse::Inserted(affected_rows),
934 MutationKind::Update => ExecuteResponse::Updated(affected_rows / 2),
935 })
936 }
937}
938
939pub(crate) fn check_log_reads(
948 catalog: &Catalog,
949 cluster: &Cluster,
950 source_ids: &BTreeSet<GlobalId>,
951 target_replica: &mut Option<ReplicaId>,
952 vars: &SessionVars,
953) -> Result<impl IntoIterator<Item = AdapterNotice>, AdapterError>
954where
955{
956 let log_names = source_ids
957 .iter()
958 .map(|gid| catalog.resolve_item_id(gid))
959 .flat_map(|item_id| catalog.introspection_dependencies(item_id))
960 .map(|item_id| catalog.get_entry(&item_id).name().item.clone())
961 .collect::<Vec<_>>();
962
963 if log_names.is_empty() {
964 return Ok(None);
965 }
966
967 let num_replicas = cluster.replicas().count();
971 if target_replica.is_none() {
972 if num_replicas == 1 {
973 *target_replica = cluster.replicas().map(|r| r.replica_id).next();
974 } else {
975 return Err(AdapterError::UntargetedLogRead { log_names });
976 }
977 }
978
979 let replica_id = target_replica.expect("set to `Some` above");
982 let replica = &cluster.replica(replica_id).expect("Replica must exist");
983 if !replica.config.compute.logging.enabled() {
984 return Err(AdapterError::IntrospectionDisabled { log_names });
985 }
986
987 Ok(vars
988 .emit_introspection_query_notice()
989 .then_some(AdapterNotice::PerReplicaLogRead { log_names }))
990}
991
992pub(crate) fn emit_optimizer_notices(
994 catalog: &Catalog,
995 session: &Session,
996 notices: &Vec<RawOptimizerNotice>,
997) {
998 if notices.is_empty() {
1000 return;
1001 }
1002 let humanizer = catalog.for_session(session);
1003 let system_vars = catalog.system_config();
1004 for notice in notices {
1005 let kind = OptimizerNoticeKind::from(notice);
1006 let notice_enabled = match kind {
1007 OptimizerNoticeKind::EqualsNull => system_vars.enable_notices_for_equals_null(),
1008 OptimizerNoticeKind::IndexAlreadyExists => {
1009 system_vars.enable_notices_for_index_already_exists()
1010 }
1011 OptimizerNoticeKind::IndexTooWideForLiteralConstraints => {
1012 system_vars.enable_notices_for_index_too_wide_for_literal_constraints()
1013 }
1014 OptimizerNoticeKind::IndexKeyEmpty => system_vars.enable_notices_for_index_empty_key(),
1015 };
1016 if notice_enabled {
1017 session.add_notice(AdapterNotice::OptimizerNotice {
1021 notice: notice.message(&humanizer, false).to_string(),
1022 hint: notice.hint(&humanizer, false).to_string(),
1023 });
1024 }
1025 session
1026 .metrics()
1027 .optimization_notices(&[kind.metric_label()])
1028 .inc_by(1);
1029 }
1030}
1031
1032pub fn eval_copy_to_uri(
1037 to: HirScalarExpr,
1038 session: &Session,
1039 catalog_state: &CatalogState,
1040) -> Result<Uri, AdapterError> {
1041 let style = ExprPrepOneShot {
1042 logical_time: EvalTime::NotAvailable,
1043 session,
1044 catalog_state,
1045 };
1046 let mut to = to.lower_uncorrelated(catalog_state.system_config())?;
1047 style.prep_scalar_expr(&mut to)?;
1048 let temp_storage = RowArena::new();
1049 let evaled = to.eval(&[], &temp_storage)?;
1050 if evaled == Datum::Null {
1051 coord_bail!("COPY TO target value can not be null");
1052 }
1053 let to_url = match Uri::from_str(evaled.unwrap_str()) {
1054 Ok(url) => {
1055 if url.scheme_str() != Some("s3") && url.scheme_str() != Some("gs") {
1056 coord_bail!("only 's3://...' and 'gs://...' urls are supported as COPY TO target");
1057 }
1058 url
1059 }
1060 Err(e) => coord_bail!("could not parse COPY TO target url: {}", e),
1061 };
1062 Ok(to_url)
1063}
1064
1065pub(crate) async fn explain_pushdown_future_inner<
1072 I: IntoIterator<Item = (GlobalId, MapFilterProject)>,
1073>(
1074 session: &Session,
1075 catalog: &Catalog,
1076 storage_collections: &Arc<dyn StorageCollections + Send + Sync>,
1077 as_of: Antichain<Timestamp>,
1078 mz_now: ResultSpec<'static>,
1079 imports: I,
1080) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
1081 let explain_timeout = *session.vars().statement_timeout();
1082 let mut futures = FuturesOrdered::new();
1083 for (id, mfp) in imports {
1084 let catalog_entry = catalog.get_entry_by_global_id(&id);
1085 let full_name = catalog
1086 .for_session(session)
1087 .resolve_full_name(&catalog_entry.name);
1088 let name = format!("{}", full_name);
1089 let relation_desc = catalog_entry
1090 .relation_desc()
1091 .expect("source should have a proper desc")
1092 .into_owned();
1093 let stats_future = storage_collections
1094 .snapshot_parts_stats(id, as_of.clone())
1095 .await;
1096
1097 let mz_now = mz_now.clone();
1098 futures.push_back(async move {
1103 let snapshot_stats = match stats_future.await {
1104 Ok(stats) => stats,
1105 Err(e) => return Err(e),
1106 };
1107 let mut total_bytes = 0;
1108 let mut total_parts = 0;
1109 let mut selected_bytes = 0;
1110 let mut selected_parts = 0;
1111 for SnapshotPartStats {
1112 encoded_size_bytes: bytes,
1113 stats,
1114 } in &snapshot_stats.parts
1115 {
1116 let bytes = u64::cast_from(*bytes);
1117 total_bytes += bytes;
1118 total_parts += 1u64;
1119 let selected = match stats {
1120 None => true,
1121 Some(stats) => {
1122 let stats = stats.decode();
1123 let stats = RelationPartStats::new(
1124 name.as_str(),
1125 &snapshot_stats.metrics.pushdown.part_stats,
1126 &relation_desc,
1127 &stats,
1128 );
1129 stats.may_match_mfp(mz_now.clone(), &mfp)
1130 }
1131 };
1132
1133 if selected {
1134 selected_bytes += bytes;
1135 selected_parts += 1u64;
1136 }
1137 }
1138 Ok(Row::pack_slice(&[
1139 name.as_str().into(),
1140 total_bytes.into(),
1141 selected_bytes.into(),
1142 total_parts.into(),
1143 selected_parts.into(),
1144 ]))
1145 });
1146 }
1147
1148 let fut = async move {
1149 match tokio::time::timeout(
1150 explain_timeout,
1151 futures::TryStreamExt::try_collect::<Vec<_>>(futures),
1152 )
1153 .await
1154 {
1155 Ok(Ok(rows)) => Ok(ExecuteResponse::SendingRowsImmediate {
1156 rows: Box::new(rows.into_row_iter()),
1157 }),
1158 Ok(Err(err)) => Err(err.into()),
1159 Err(_) => Err(AdapterError::StatementTimeout),
1160 }
1161 };
1162 fut
1163}
1164
1165pub(crate) async fn explain_plan_inner(
1168 session: &Session,
1169 catalog: &Catalog,
1170 df_meta: DataflowMetainfo,
1171 explain_ctx: ExplainPlanContext,
1172 optimizer: peek::Optimizer,
1173 insights_ctx: Option<Box<PlanInsightsContext>>,
1174) -> Result<Vec<Row>, AdapterError> {
1175 let ExplainPlanContext {
1176 config,
1177 format,
1178 stage,
1179 desc,
1180 optimizer_trace,
1181 ..
1182 } = explain_ctx;
1183
1184 let desc = desc.expect("RelationDesc for SelectPlan in EXPLAIN mode");
1185
1186 let session_catalog = catalog.for_session(session);
1187 let expr_humanizer = {
1188 let transient_items = btreemap! {
1189 optimizer.select_id() => TransientItem::new(
1190 Some(vec![GlobalId::Explain.to_string()]),
1191 Some(desc.iter_names().map(|c| c.to_string()).collect()),
1192 )
1193 };
1194 ExprHumanizerExt::new(transient_items, &session_catalog)
1195 };
1196
1197 let finishing = if optimizer.finishing().is_trivial(desc.arity()) {
1198 None
1199 } else {
1200 Some(optimizer.finishing().clone())
1201 };
1202
1203 let target_cluster = catalog.get_cluster(optimizer.cluster_id());
1204 let features = optimizer.config().features.clone();
1205
1206 let rows = optimizer_trace
1207 .into_rows(
1208 format,
1209 &config,
1210 &features,
1211 &expr_humanizer,
1212 finishing,
1213 Some(target_cluster),
1214 df_meta,
1215 stage,
1216 plan::ExplaineeStatementKind::Select,
1217 insights_ctx,
1218 )
1219 .await?;
1220
1221 Ok(rows)
1222}
1223
1224pub(crate) async fn statistics_oracle(
1229 session: &Session,
1230 source_ids: &BTreeSet<GlobalId>,
1231 query_as_of: &Antichain<Timestamp>,
1232 is_oneshot: bool,
1233 system_config: &vars::SystemVars,
1234 storage_collections: &dyn StorageCollections,
1235) -> Result<Box<dyn StatisticsOracle>, AdapterError> {
1236 if !session.vars().enable_session_cardinality_estimates() {
1237 let stats: Box<dyn StatisticsOracle> = Box::new(EmptyStatisticsOracle);
1238 return Ok(stats);
1239 }
1240
1241 let timeout = if is_oneshot {
1242 system_config.optimizer_oneshot_stats_timeout()
1244 } else {
1245 system_config.optimizer_stats_timeout()
1246 };
1247
1248 let cached_stats = mz_ore::future::timeout(
1249 timeout,
1250 CachedStatisticsOracle::new(source_ids, query_as_of, storage_collections),
1251 )
1252 .await;
1253
1254 match cached_stats {
1255 Ok(stats) => Ok(Box::new(stats)),
1256 Err(mz_ore::future::TimeoutError::DeadlineElapsed) => {
1257 warn!(
1258 is_oneshot = is_oneshot,
1259 "optimizer statistics collection timed out after {}ms",
1260 timeout.as_millis()
1261 );
1262
1263 Ok(Box::new(EmptyStatisticsOracle))
1264 }
1265 Err(mz_ore::future::TimeoutError::Inner(e)) => Err(AdapterError::Storage(e)),
1266 }
1267}
1268
1269#[derive(Debug)]
1270struct CachedStatisticsOracle {
1271 cache: BTreeMap<GlobalId, usize>,
1272}
1273
1274impl CachedStatisticsOracle {
1275 pub async fn new(
1276 ids: &BTreeSet<GlobalId>,
1277 as_of: &Antichain<Timestamp>,
1278 storage_collections: &dyn StorageCollections,
1279 ) -> Result<Self, StorageError> {
1280 let mut cache = BTreeMap::new();
1281
1282 for id in ids {
1283 let stats = storage_collections.snapshot_stats(*id, as_of.clone()).await;
1284
1285 match stats {
1286 Ok(stats) => {
1287 cache.insert(*id, stats.num_updates);
1288 }
1289 Err(StorageError::IdentifierMissing(id)) => {
1290 ::tracing::debug!("no statistics for {id}")
1291 }
1292 Err(e) => return Err(e),
1293 }
1294 }
1295
1296 Ok(Self { cache })
1297 }
1298}
1299
1300impl StatisticsOracle for CachedStatisticsOracle {
1301 fn cardinality_estimate(&self, id: GlobalId) -> Option<usize> {
1302 self.cache.get(&id).map(|estimate| *estimate)
1303 }
1304
1305 fn as_map(&self) -> BTreeMap<GlobalId, usize> {
1306 self.cache.clone()
1307 }
1308}