1#![allow(clippy::pub_use)]
12
13use std::collections::{BTreeMap, BTreeSet};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use futures::FutureExt;
20use futures::future::LocalBoxFuture;
21use futures::stream::FuturesOrdered;
22use http::Uri;
23use inner::return_if_err;
24use maplit::btreemap;
25use mz_catalog::memory::objects::Cluster;
26use mz_controller_types::ReplicaId;
27use mz_expr::row::RowCollection;
28use mz_expr::{MapFilterProject, MirRelationExpr, ResultSpec, RowSetFinishing};
29use mz_ore::cast::CastFrom;
30use mz_ore::tracing::OpenTelemetryContext;
31use mz_persist_client::stats::SnapshotPartStats;
32use mz_repr::explain::{ExprHumanizerExt, TransientItem};
33use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, Row, RowArena, Timestamp};
34use mz_sql::catalog::{CatalogError, SessionCatalog};
35use mz_sql::names::ResolvedIds;
36use mz_sql::plan::{
37 self, AbortTransactionPlan, CommitTransactionPlan, CopyFromSource, CreateRolePlan,
38 CreateSourcePlanBundle, FetchPlan, HirScalarExpr, MutationKind, Params, Plan, PlanKind,
39 RaisePlan,
40};
41use mz_sql::rbac;
42use mz_sql::session::metadata::SessionMetadata;
43use mz_sql::session::vars;
44use mz_sql::session::vars::SessionVars;
45use mz_sql_parser::ast::{Raw, Statement};
46use mz_storage_client::client::TableData;
47use mz_storage_client::storage_collections::StorageCollections;
48use mz_storage_types::connections::inline::IntoInlineConnection;
49use mz_storage_types::controller::StorageError;
50use mz_storage_types::stats::RelationPartStats;
51use mz_transform::dataflow::DataflowMetainfo;
52use mz_transform::notice::{OptimizerNoticeApi, OptimizerNoticeKind, RawOptimizerNotice};
53use mz_transform::{EmptyStatisticsOracle, StatisticsOracle};
54use timely::progress::Antichain;
55use timely::progress::Timestamp as TimelyTimestamp;
56use tokio::sync::oneshot;
57use tracing::{Instrument, Level, Span, event, warn};
58
59use crate::ExecuteContext;
60use crate::catalog::{Catalog, CatalogState};
61use crate::command::{Command, ExecuteResponse, Response};
62use crate::coord::appends::{DeferredOp, DeferredPlan};
63use crate::coord::validity::PlanValidity;
64use crate::coord::{
65 Coordinator, DeferredPlanStatement, ExplainPlanContext, Message, PlanStatement, TargetCluster,
66 catalog_serving,
67};
68use crate::error::AdapterError;
69use crate::explain::insights::PlanInsightsContext;
70use crate::notice::AdapterNotice;
71use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
72use crate::optimize::peek;
73use crate::session::{
74 EndTransactionAction, Session, StateRevision, TransactionOps, TransactionStatus, WriteOp,
75};
76use crate::util::ClientTransmitter;
77
78mod inner;
98
99impl Coordinator {
100 pub(crate) fn sequence_plan(
104 &mut self,
105 mut ctx: ExecuteContext,
106 plan: Plan,
107 resolved_ids: ResolvedIds,
108 ) -> LocalBoxFuture<'_, ()> {
109 async move {
110 let responses = ExecuteResponse::generated_from(&PlanKind::from(&plan));
111 ctx.tx_mut().set_allowed(responses);
112
113 if self.controller.read_only() && !plan.allowed_in_read_only() {
114 ctx.retire(Err(AdapterError::ReadOnly));
115 return;
116 }
117
118 if let Some((dependencies, wait_future)) =
121 super::appends::waiting_on_startup_appends(self.catalog(), ctx.session_mut(), &plan)
122 {
123 let conn_id = ctx.session().conn_id();
124 tracing::debug!(%conn_id, "deferring plan for startup appends");
125
126 let role_metadata = ctx.session().role_metadata().clone();
127 let validity = PlanValidity::new(
128 self.catalog.transient_revision(),
129 dependencies,
130 None,
131 None,
132 role_metadata,
133 );
134 let deferred_plan = DeferredPlan {
135 ctx,
136 plan,
137 validity,
138 requires_locks: BTreeSet::default(),
139 };
140 let acquire_future = wait_future.map(|()| None);
143
144 self.defer_op(acquire_future, DeferredOp::Plan(deferred_plan));
145
146 return;
149 };
150
151 let target_cluster = match ctx.session().transaction().cluster() {
153 Some(cluster_id) => TargetCluster::Transaction(cluster_id),
155 None => {
157 let session_catalog = self.catalog.for_session(ctx.session());
158 catalog_serving::auto_run_on_catalog_server(
159 &session_catalog,
160 ctx.session(),
161 &plan,
162 )
163 }
164 };
165 let (target_cluster_id, target_cluster_name) = match self
166 .catalog()
167 .resolve_target_cluster(target_cluster, ctx.session())
168 {
169 Ok(cluster) => (Some(cluster.id), Some(cluster.name.clone())),
170 Err(_) => (None, None),
171 };
172
173 if let (Some(cluster_id), Some(statement_id)) =
174 (target_cluster_id, ctx.extra().contents())
175 {
176 self.set_statement_execution_cluster(statement_id, cluster_id);
177 }
178
179 let session_catalog = self.catalog.for_session(ctx.session());
180
181 if let Some(cluster_name) = &target_cluster_name {
182 if let Err(e) = catalog_serving::check_cluster_restrictions(
183 cluster_name,
184 &session_catalog,
185 &plan,
186 ) {
187 return ctx.retire(Err(e));
188 }
189 }
190
191 if let Err(e) = rbac::check_plan(
192 &session_catalog,
193 Some(|id| {
194 self.active_conns()
197 .into_iter()
198 .find(|(conn_id, _)| conn_id.unhandled() == id)
199 .map(|(_, conn_meta)| *conn_meta.authenticated_role_id())
200 }),
201 ctx.session(),
202 &plan,
203 target_cluster_id,
204 &resolved_ids,
205 ) {
206 return ctx.retire(Err(e.into()));
207 }
208
209 match plan {
210 Plan::CreateSource(plan) => {
211 let (item_id, global_id) = return_if_err!(self.allocate_user_id().await, ctx);
212 let result = self
213 .sequence_create_source(
214 &mut ctx,
215 vec![CreateSourcePlanBundle {
216 item_id,
217 global_id,
218 plan,
219 resolved_ids,
220 available_source_references: None,
221 }],
222 )
223 .await;
224 ctx.retire(result);
225 }
226 Plan::CreateSources(plans) => {
227 assert!(
228 resolved_ids.is_empty(),
229 "each plan has separate resolved_ids"
230 );
231 let result = self.sequence_create_source(&mut ctx, plans).await;
232 ctx.retire(result);
233 }
234 Plan::CreateConnection(plan) => {
235 self.sequence_create_connection(ctx, plan, resolved_ids)
236 .await;
237 }
238 Plan::CreateDatabase(plan) => {
239 let result = self.sequence_create_database(ctx.session_mut(), plan).await;
240 ctx.retire(result);
241 }
242 Plan::CreateSchema(plan) => {
243 let result = self.sequence_create_schema(ctx.session_mut(), plan).await;
244 ctx.retire(result);
245 }
246 Plan::CreateRole(plan) => {
247 let result = self
248 .sequence_create_role(Some(ctx.session().conn_id()), plan)
249 .await;
250 if let Some(notice) = self.should_emit_rbac_notice(ctx.session()) {
251 ctx.session().add_notice(notice);
252 }
253 ctx.retire(result);
254 }
255 Plan::CreateCluster(plan) => {
256 let result = self.sequence_create_cluster(ctx.session(), plan).await;
257 ctx.retire(result);
258 }
259 Plan::CreateClusterReplica(plan) => {
260 let result = self
261 .sequence_create_cluster_replica(ctx.session(), plan)
262 .await;
263 ctx.retire(result);
264 }
265 Plan::CreateTable(plan) => {
266 let result = self
267 .sequence_create_table(&mut ctx, plan, resolved_ids)
268 .await;
269 ctx.retire(result);
270 }
271 Plan::CreateSecret(plan) => {
272 self.sequence_create_secret(ctx, plan).await;
273 }
274 Plan::CreateSink(plan) => {
275 self.sequence_create_sink(ctx, plan, resolved_ids).await;
276 }
277 Plan::CreateView(plan) => {
278 self.sequence_create_view(ctx, plan, resolved_ids).await;
279 }
280 Plan::CreateMaterializedView(plan) => {
281 self.sequence_create_materialized_view(ctx, plan, resolved_ids)
282 .await;
283 }
284 Plan::CreateContinualTask(plan) => {
285 let res = self
286 .sequence_create_continual_task(&mut ctx, plan, resolved_ids)
287 .await;
288 ctx.retire(res);
289 }
290 Plan::CreateIndex(plan) => {
291 self.sequence_create_index(ctx, plan, resolved_ids).await;
292 }
293 Plan::CreateType(plan) => {
294 let result = self
295 .sequence_create_type(ctx.session(), plan, resolved_ids)
296 .await;
297 ctx.retire(result);
298 }
299 Plan::CreateNetworkPolicy(plan) => {
300 let res = self
301 .sequence_create_network_policy(ctx.session(), plan)
302 .await;
303 ctx.retire(res);
304 }
305 Plan::Comment(plan) => {
306 let result = self.sequence_comment_on(ctx.session(), plan).await;
307 ctx.retire(result);
308 }
309 Plan::CopyTo(plan) => {
310 self.sequence_copy_to(ctx, plan, target_cluster).await;
311 }
312 Plan::DropObjects(plan) => {
313 let result = self.sequence_drop_objects(&mut ctx, plan).await;
314 ctx.retire(result);
315 }
316 Plan::DropOwned(plan) => {
317 let result = self.sequence_drop_owned(ctx.session_mut(), plan).await;
318 ctx.retire(result);
319 }
320 Plan::EmptyQuery => {
321 ctx.retire(Ok(ExecuteResponse::EmptyQuery));
322 }
323 Plan::ShowAllVariables => {
324 let result = self.sequence_show_all_variables(ctx.session());
325 ctx.retire(result);
326 }
327 Plan::ShowVariable(plan) => {
328 let result = self.sequence_show_variable(ctx.session(), plan);
329 ctx.retire(result);
330 }
331 Plan::InspectShard(plan) => {
332 let result = self.sequence_inspect_shard(ctx.session(), plan).await;
334 ctx.retire(result);
335 }
336 Plan::SetVariable(plan) => {
337 let result = self.sequence_set_variable(ctx.session_mut(), plan);
338 ctx.retire(result);
339 }
340 Plan::ResetVariable(plan) => {
341 let result = self.sequence_reset_variable(ctx.session_mut(), plan);
342 ctx.retire(result);
343 }
344 Plan::SetTransaction(plan) => {
345 let result = self.sequence_set_transaction(ctx.session_mut(), plan);
346 ctx.retire(result);
347 }
348 Plan::StartTransaction(plan) => {
349 if matches!(
350 ctx.session().transaction(),
351 TransactionStatus::InTransaction(_)
352 ) {
353 ctx.session()
354 .add_notice(AdapterNotice::ExistingTransactionInProgress);
355 }
356 let result = ctx.session_mut().start_transaction(
357 self.now_datetime(),
358 plan.access,
359 plan.isolation_level,
360 );
361 ctx.retire(result.map(|_| ExecuteResponse::StartedTransaction))
362 }
363 Plan::CommitTransaction(CommitTransactionPlan {
364 ref transaction_type,
365 })
366 | Plan::AbortTransaction(AbortTransactionPlan {
367 ref transaction_type,
368 }) => {
369 if ctx.session().transaction().is_ddl() {
372 if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
373 let prev = self
374 .active_conns
375 .get_mut(ctx.session().conn_id())
376 .expect("connection must exist")
377 .deferred_lock
378 .replace(guard);
379 assert!(
380 prev.is_none(),
381 "connections should have at most one lock guard"
382 );
383 } else {
384 self.serialized_ddl.push_back(DeferredPlanStatement {
385 ctx,
386 ps: PlanStatement::Plan { plan, resolved_ids },
387 });
388 return;
389 }
390 }
391
392 let action = match &plan {
393 Plan::CommitTransaction(_) => EndTransactionAction::Commit,
394 Plan::AbortTransaction(_) => EndTransactionAction::Rollback,
395 _ => unreachable!(),
396 };
397 if ctx.session().transaction().is_implicit() && !transaction_type.is_implicit()
398 {
399 ctx.session().add_notice(
404 AdapterNotice::ExplicitTransactionControlInImplicitTransaction,
405 );
406 }
407 self.sequence_end_transaction(ctx, action).await;
408 }
409 Plan::Select(plan) => {
410 let max = Some(ctx.session().vars().max_query_result_size());
411 self.sequence_peek(ctx, plan, target_cluster, max).await;
412 }
413 Plan::Subscribe(plan) => {
414 self.sequence_subscribe(ctx, plan, target_cluster).await;
415 }
416 Plan::SideEffectingFunc(plan) => {
417 self.sequence_side_effecting_func(ctx, plan).await;
418 }
419 Plan::ShowCreate(plan) => {
420 ctx.retire(Ok(Self::send_immediate_rows(plan.row)));
421 }
422 Plan::ShowColumns(show_columns_plan) => {
423 let max = Some(ctx.session().vars().max_query_result_size());
424 self.sequence_peek(ctx, show_columns_plan.select_plan, target_cluster, max)
425 .await;
426 }
427 Plan::CopyFrom(plan) => match plan.source {
428 CopyFromSource::Stdin => {
429 let (tx, _, session, ctx_extra) = ctx.into_parts();
430 tx.send(
431 Ok(ExecuteResponse::CopyFrom {
432 target_id: plan.target_id,
433 target_name: plan.target_name,
434 columns: plan.columns,
435 params: plan.params,
436 ctx_extra,
437 }),
438 session,
439 );
440 }
441 CopyFromSource::Url(_) | CopyFromSource::AwsS3 { .. } => {
442 self.sequence_copy_from(ctx, plan, target_cluster).await;
443 }
444 },
445 Plan::ExplainPlan(plan) => {
446 self.sequence_explain_plan(ctx, plan, target_cluster).await;
447 }
448 Plan::ExplainPushdown(plan) => {
449 self.sequence_explain_pushdown(ctx, plan, target_cluster)
450 .await;
451 }
452 Plan::ExplainSinkSchema(plan) => {
453 let result = self.sequence_explain_schema(plan);
454 ctx.retire(result);
455 }
456 Plan::ExplainTimestamp(plan) => {
457 self.sequence_explain_timestamp(ctx, plan, target_cluster)
458 .await;
459 }
460 Plan::Insert(plan) => {
461 self.sequence_insert(ctx, plan).await;
462 }
463 Plan::ReadThenWrite(plan) => {
464 self.sequence_read_then_write(ctx, plan).await;
465 }
466 Plan::AlterNoop(plan) => {
467 ctx.retire(Ok(ExecuteResponse::AlteredObject(plan.object_type)));
468 }
469 Plan::AlterCluster(plan) => {
470 self.sequence_alter_cluster_staged(ctx, plan).await;
471 }
472 Plan::AlterClusterRename(plan) => {
473 let result = self.sequence_alter_cluster_rename(&mut ctx, plan).await;
474 ctx.retire(result);
475 }
476 Plan::AlterClusterSwap(plan) => {
477 let result = self.sequence_alter_cluster_swap(&mut ctx, plan).await;
478 ctx.retire(result);
479 }
480 Plan::AlterClusterReplicaRename(plan) => {
481 let result = self
482 .sequence_alter_cluster_replica_rename(ctx.session(), plan)
483 .await;
484 ctx.retire(result);
485 }
486 Plan::AlterConnection(plan) => {
487 self.sequence_alter_connection(ctx, plan).await;
488 }
489 Plan::AlterSetCluster(plan) => {
490 let result = self.sequence_alter_set_cluster(ctx.session(), plan).await;
491 ctx.retire(result);
492 }
493 Plan::AlterRetainHistory(plan) => {
494 let result = self.sequence_alter_retain_history(&mut ctx, plan).await;
495 ctx.retire(result);
496 }
497 Plan::AlterSourceTimestampInterval(plan) => {
498 let result = self
499 .sequence_alter_source_timestamp_interval(&mut ctx, plan)
500 .await;
501 ctx.retire(result);
502 }
503 Plan::AlterItemRename(plan) => {
504 let result = self.sequence_alter_item_rename(&mut ctx, plan).await;
505 ctx.retire(result);
506 }
507 Plan::AlterSchemaRename(plan) => {
508 let result = self.sequence_alter_schema_rename(&mut ctx, plan).await;
509 ctx.retire(result);
510 }
511 Plan::AlterSchemaSwap(plan) => {
512 let result = self.sequence_alter_schema_swap(&mut ctx, plan).await;
513 ctx.retire(result);
514 }
515 Plan::AlterRole(plan) => {
516 let result = self.sequence_alter_role(ctx.session_mut(), plan).await;
517 ctx.retire(result);
518 }
519 Plan::AlterSecret(plan) => {
520 self.sequence_alter_secret(ctx, plan).await;
521 }
522 Plan::AlterSink(plan) => {
523 self.sequence_alter_sink_prepare(ctx, plan).await;
524 }
525 Plan::AlterSource(plan) => {
526 let result = self.sequence_alter_source(ctx.session_mut(), plan).await;
527 ctx.retire(result);
528 }
529 Plan::AlterSystemSet(plan) => {
530 let result = self.sequence_alter_system_set(ctx.session(), plan).await;
531 ctx.retire(result);
532 }
533 Plan::AlterSystemReset(plan) => {
534 let result = self.sequence_alter_system_reset(ctx.session(), plan).await;
535 ctx.retire(result);
536 }
537 Plan::AlterSystemResetAll(plan) => {
538 let result = self
539 .sequence_alter_system_reset_all(ctx.session(), plan)
540 .await;
541 ctx.retire(result);
542 }
543 Plan::AlterTableAddColumn(plan) => {
544 let result = self.sequence_alter_table(&mut ctx, plan).await;
545 ctx.retire(result);
546 }
547 Plan::AlterMaterializedViewApplyReplacement(plan) => {
548 self.sequence_alter_materialized_view_apply_replacement_prepare(ctx, plan)
549 .await;
550 }
551 Plan::AlterNetworkPolicy(plan) => {
552 let res = self
553 .sequence_alter_network_policy(ctx.session(), plan)
554 .await;
555 ctx.retire(res);
556 }
557 Plan::DiscardTemp => {
558 self.drop_temp_items(ctx.session().conn_id()).await;
559 ctx.retire(Ok(ExecuteResponse::DiscardedTemp));
560 }
561 Plan::DiscardAll => {
562 let ret = if let TransactionStatus::Started(_) = ctx.session().transaction() {
563 self.clear_transaction(ctx.session_mut()).await;
564 self.drop_temp_items(ctx.session().conn_id()).await;
565 ctx.session_mut().reset();
566 Ok(ExecuteResponse::DiscardedAll)
567 } else {
568 Err(AdapterError::OperationProhibitsTransaction(
569 "DISCARD ALL".into(),
570 ))
571 };
572 ctx.retire(ret);
573 }
574 Plan::Declare(plan) => {
575 self.declare(ctx, plan.name, plan.stmt, plan.sql, plan.params);
576 }
577 Plan::Fetch(FetchPlan {
578 name,
579 count,
580 timeout,
581 }) => {
582 let ctx_extra = std::mem::take(ctx.extra_mut());
583 ctx.retire(Ok(ExecuteResponse::Fetch {
584 name,
585 count,
586 timeout,
587 ctx_extra,
588 }));
589 }
590 Plan::Close(plan) => {
591 if ctx.session_mut().remove_portal(&plan.name) {
592 ctx.retire(Ok(ExecuteResponse::ClosedCursor));
593 } else {
594 ctx.retire(Err(AdapterError::UnknownCursor(plan.name)));
595 }
596 }
597 Plan::Prepare(plan) => {
598 if ctx
599 .session()
600 .get_prepared_statement_unverified(&plan.name)
601 .is_some()
602 {
603 ctx.retire(Err(AdapterError::PreparedStatementExists(plan.name)));
604 } else {
605 let state_revision = StateRevision {
606 catalog_revision: self.catalog().transient_revision(),
607 session_state_revision: ctx.session().state_revision(),
608 };
609 ctx.session_mut().set_prepared_statement(
610 plan.name,
611 Some(plan.stmt),
612 plan.sql,
613 plan.desc,
614 state_revision,
615 self.now(),
616 );
617 ctx.retire(Ok(ExecuteResponse::Prepare));
618 }
619 }
620 Plan::Execute(plan) => {
621 match self.sequence_execute(ctx.session_mut(), plan) {
622 Ok(portal_name) => {
623 let (tx, _, session, extra) = ctx.into_parts();
624 self.internal_cmd_tx
625 .send(Message::Command(
626 OpenTelemetryContext::obtain(),
627 Command::Execute {
628 portal_name,
629 session,
630 tx: tx.take(),
631 outer_ctx_extra: Some(extra),
632 },
633 ))
634 .expect("sending to self.internal_cmd_tx cannot fail");
635 }
636 Err(err) => ctx.retire(Err(err)),
637 };
638 }
639 Plan::Deallocate(plan) => match plan.name {
640 Some(name) => {
641 if ctx.session_mut().remove_prepared_statement(&name) {
642 ctx.retire(Ok(ExecuteResponse::Deallocate { all: false }));
643 } else {
644 ctx.retire(Err(AdapterError::UnknownPreparedStatement(name)));
645 }
646 }
647 None => {
648 ctx.session_mut().remove_all_prepared_statements();
649 ctx.retire(Ok(ExecuteResponse::Deallocate { all: true }));
650 }
651 },
652 Plan::Raise(RaisePlan { severity }) => {
653 ctx.session()
654 .add_notice(AdapterNotice::UserRequested { severity });
655 ctx.retire(Ok(ExecuteResponse::Raised));
656 }
657 Plan::GrantPrivileges(plan) => {
658 let result = self
659 .sequence_grant_privileges(ctx.session_mut(), plan)
660 .await;
661 ctx.retire(result);
662 }
663 Plan::RevokePrivileges(plan) => {
664 let result = self
665 .sequence_revoke_privileges(ctx.session_mut(), plan)
666 .await;
667 ctx.retire(result);
668 }
669 Plan::AlterDefaultPrivileges(plan) => {
670 let result = self
671 .sequence_alter_default_privileges(ctx.session_mut(), plan)
672 .await;
673 ctx.retire(result);
674 }
675 Plan::GrantRole(plan) => {
676 let result = self.sequence_grant_role(ctx.session_mut(), plan).await;
677 ctx.retire(result);
678 }
679 Plan::RevokeRole(plan) => {
680 let result = self.sequence_revoke_role(ctx.session_mut(), plan).await;
681 ctx.retire(result);
682 }
683 Plan::AlterOwner(plan) => {
684 let result = self.sequence_alter_owner(ctx.session_mut(), plan).await;
685 ctx.retire(result);
686 }
687 Plan::ReassignOwned(plan) => {
688 let result = self.sequence_reassign_owned(ctx.session_mut(), plan).await;
689 ctx.retire(result);
690 }
691 Plan::ValidateConnection(plan) => {
692 let connection = plan
693 .connection
694 .into_inline_connection(self.catalog().state());
695 let current_storage_configuration = self.controller.storage.config().clone();
696 mz_ore::task::spawn(|| "coord::validate_connection", async move {
697 let res = match connection
698 .validate(plan.id, ¤t_storage_configuration)
699 .await
700 {
701 Ok(()) => Ok(ExecuteResponse::ValidatedConnection),
702 Err(err) => Err(err.into()),
703 };
704 ctx.retire(res);
705 });
706 }
707 }
708 }
709 .instrument(tracing::debug_span!("coord::sequencer::sequence_plan"))
710 .boxed_local()
711 }
712
713 #[mz_ore::instrument(level = "debug")]
714 pub(crate) async fn sequence_execute_single_statement_transaction(
715 &mut self,
716 ctx: ExecuteContext,
717 stmt: Arc<Statement<Raw>>,
718 params: Params,
719 ) {
720 let (tx, internal_cmd_tx, mut session, extra) = ctx.into_parts();
722 assert!(matches!(session.transaction(), TransactionStatus::Default));
723 session.start_transaction_single_stmt(self.now_datetime());
724 let conn_id = session.conn_id().unhandled();
725
726 let (sub_tx, sub_rx) = oneshot::channel();
728 let sub_ct = ClientTransmitter::new(sub_tx, self.internal_cmd_tx.clone());
729 let sub_ctx = ExecuteContext::from_parts(sub_ct, internal_cmd_tx, session, extra);
730 self.handle_execute_inner(stmt, params, sub_ctx).await;
731
732 let internal_cmd_tx = self.internal_cmd_tx.clone();
735 mz_ore::task::spawn(
736 || format!("execute_single_statement:{conn_id}"),
737 async move {
738 let Ok(Response {
739 result,
740 session,
741 otel_ctx,
742 }) = sub_rx.await
743 else {
744 return;
746 };
747 otel_ctx.attach_as_parent();
748 let (sub_tx, sub_rx) = oneshot::channel();
749 let _ = internal_cmd_tx.send(Message::Command(
750 otel_ctx,
751 Command::Commit {
752 action: EndTransactionAction::Commit,
753 session,
754 tx: sub_tx,
755 },
756 ));
757 let Ok(commit_response) = sub_rx.await else {
758 return;
760 };
761 assert!(matches!(
762 commit_response.session.transaction(),
763 TransactionStatus::Default
764 ));
765 let result = match (result, commit_response.result) {
771 (Ok(_), commit) => commit,
772 (Err(result), _) => Err(result),
773 };
774 tx.send(result, commit_response.session);
777 }
778 .instrument(Span::current()),
779 );
780 }
781
782 #[mz_ore::instrument(level = "debug")]
786 pub(crate) async fn sequence_create_role_for_startup(
787 &mut self,
788 plan: CreateRolePlan,
789 ) -> Result<ExecuteResponse, AdapterError> {
790 self.sequence_create_role(None, plan).await
797 }
798
799 pub(crate) fn allocate_transient_id(&self) -> (CatalogItemId, GlobalId) {
800 self.transient_id_gen.allocate_id()
801 }
802
803 fn should_emit_rbac_notice(&self, session: &Session) -> Option<AdapterNotice> {
804 if !rbac::is_rbac_enabled_for_session(self.catalog.system_config(), session) {
805 Some(AdapterNotice::RbacUserDisabled)
806 } else {
807 None
808 }
809 }
810
811 pub(crate) fn insert_constant(
818 catalog: &Catalog,
819 session: &mut Session,
820 target_id: CatalogItemId,
821 constants: MirRelationExpr,
822 ) -> Result<ExecuteResponse, AdapterError> {
823 let desc = match catalog.try_get_entry(&target_id) {
825 Some(table) => {
826 table.relation_desc_latest().expect("table has desc")
828 }
829 None => {
830 return Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
831 kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
832 target_id.to_string(),
833 )),
834 }));
835 }
836 };
837
838 match constants.as_const() {
839 Some((rows, ..)) => {
840 let rows = rows.clone()?;
841 for (row, _) in &rows {
842 for (i, datum) in row.iter().enumerate() {
843 desc.constraints_met(i, &datum)?;
844 }
845 }
846 let diffs_plan = plan::SendDiffsPlan {
847 id: target_id,
848 updates: rows,
849 kind: MutationKind::Insert,
850 returning: Vec::new(),
851 max_result_size: catalog.system_config().max_result_size(),
852 };
853 Self::send_diffs(session, diffs_plan)
854 }
855 None => panic!(
856 "tried using sequence_insert_constant on non-constant MirRelationExpr\n{}",
857 constants.pretty(),
858 ),
859 }
860 }
861
862 #[mz_ore::instrument(level = "debug")]
863 pub(crate) fn send_diffs(
864 session: &mut Session,
865 mut plan: plan::SendDiffsPlan,
866 ) -> Result<ExecuteResponse, AdapterError> {
867 let affected_rows = {
868 let mut affected_rows = Diff::from(0);
869 let mut all_positive_diffs = true;
870 for (_, diff) in plan.updates.iter() {
873 if diff.is_negative() {
874 all_positive_diffs = false;
875 break;
876 }
877
878 affected_rows += diff;
879 }
880
881 if !all_positive_diffs {
882 differential_dataflow::consolidation::consolidate(&mut plan.updates);
886
887 affected_rows = Diff::ZERO;
888 for (_, diff) in plan.updates.iter() {
893 affected_rows += diff.abs();
894 }
895 }
896
897 usize::try_from(affected_rows.into_inner()).expect("positive Diff must fit")
898 };
899 event!(
900 Level::TRACE,
901 affected_rows,
902 id = format!("{:?}", plan.id),
903 kind = format!("{:?}", plan.kind),
904 updates = plan.updates.len(),
905 returning = plan.returning.len(),
906 );
907
908 session.add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
909 id: plan.id,
910 rows: TableData::Rows(plan.updates),
911 }]))?;
912 if !plan.returning.is_empty() {
913 let finishing = RowSetFinishing {
914 order_by: Vec::new(),
915 limit: None,
916 offset: 0,
917 project: (0..plan.returning[0].0.iter().count()).collect(),
918 };
919 let max_returned_query_size = session.vars().max_query_result_size();
920 let duration_histogram = session.metrics().row_set_finishing_seconds();
921
922 return match finishing.finish(
923 RowCollection::new(plan.returning, &finishing.order_by),
924 plan.max_result_size,
925 Some(max_returned_query_size),
926 duration_histogram,
927 ) {
928 Ok((rows, _size_bytes)) => Ok(Self::send_immediate_rows(rows)),
929 Err(e) => Err(AdapterError::ResultSize(e)),
930 };
931 }
932 Ok(match plan.kind {
933 MutationKind::Delete => ExecuteResponse::Deleted(affected_rows),
934 MutationKind::Insert => ExecuteResponse::Inserted(affected_rows),
935 MutationKind::Update => ExecuteResponse::Updated(affected_rows / 2),
936 })
937 }
938}
939
940pub(crate) fn check_log_reads(
949 catalog: &Catalog,
950 cluster: &Cluster,
951 source_ids: &BTreeSet<GlobalId>,
952 target_replica: &mut Option<ReplicaId>,
953 vars: &SessionVars,
954) -> Result<impl IntoIterator<Item = AdapterNotice>, AdapterError>
955where
956{
957 let log_names = source_ids
958 .iter()
959 .map(|gid| catalog.resolve_item_id(gid))
960 .flat_map(|item_id| catalog.introspection_dependencies(item_id))
961 .map(|item_id| catalog.get_entry(&item_id).name().item.clone())
962 .collect::<Vec<_>>();
963
964 if log_names.is_empty() {
965 return Ok(None);
966 }
967
968 let num_replicas = cluster.replicas().count();
972 if target_replica.is_none() {
973 if num_replicas == 1 {
974 *target_replica = cluster.replicas().map(|r| r.replica_id).next();
975 } else {
976 return Err(AdapterError::UntargetedLogRead { log_names });
977 }
978 }
979
980 let replica_id = target_replica.expect("set to `Some` above");
983 let replica = &cluster.replica(replica_id).expect("Replica must exist");
984 if !replica.config.compute.logging.enabled() {
985 return Err(AdapterError::IntrospectionDisabled { log_names });
986 }
987
988 Ok(vars
989 .emit_introspection_query_notice()
990 .then_some(AdapterNotice::PerReplicaLogRead { log_names }))
991}
992
993pub(crate) fn emit_optimizer_notices(
995 catalog: &Catalog,
996 session: &Session,
997 notices: &Vec<RawOptimizerNotice>,
998) {
999 if notices.is_empty() {
1001 return;
1002 }
1003 let humanizer = catalog.for_session(session);
1004 let system_vars = catalog.system_config();
1005 for notice in notices {
1006 let kind = OptimizerNoticeKind::from(notice);
1007 let notice_enabled = match kind {
1008 OptimizerNoticeKind::EqualsNull => system_vars.enable_notices_for_equals_null(),
1009 OptimizerNoticeKind::IndexAlreadyExists => {
1010 system_vars.enable_notices_for_index_already_exists()
1011 }
1012 OptimizerNoticeKind::IndexTooWideForLiteralConstraints => {
1013 system_vars.enable_notices_for_index_too_wide_for_literal_constraints()
1014 }
1015 OptimizerNoticeKind::IndexKeyEmpty => system_vars.enable_notices_for_index_empty_key(),
1016 };
1017 if notice_enabled {
1018 session.add_notice(AdapterNotice::OptimizerNotice {
1022 notice: notice.message(&humanizer, false).to_string(),
1023 hint: notice.hint(&humanizer, false).to_string(),
1024 });
1025 }
1026 session
1027 .metrics()
1028 .optimization_notices(&[kind.metric_label()])
1029 .inc_by(1);
1030 }
1031}
1032
1033pub fn eval_copy_to_uri(
1038 to: HirScalarExpr,
1039 session: &Session,
1040 catalog_state: &CatalogState,
1041) -> Result<Uri, AdapterError> {
1042 let style = ExprPrepOneShot {
1043 logical_time: EvalTime::NotAvailable,
1044 session,
1045 catalog_state,
1046 };
1047 let mut to = to.lower_uncorrelated(catalog_state.system_config())?;
1048 style.prep_scalar_expr(&mut to)?;
1049 let temp_storage = RowArena::new();
1050 let evaled = to.eval(&[], &temp_storage)?;
1051 if evaled == Datum::Null {
1052 coord_bail!("COPY TO target value can not be null");
1053 }
1054 let to_url = match Uri::from_str(evaled.unwrap_str()) {
1055 Ok(url) => {
1056 if url.scheme_str() != Some("s3") {
1057 coord_bail!("only 's3://...' urls are supported as COPY TO target");
1058 }
1059 url
1060 }
1061 Err(e) => coord_bail!("could not parse COPY TO target url: {}", e),
1062 };
1063 Ok(to_url)
1064}
1065
1066pub(crate) async fn explain_pushdown_future_inner<
1073 I: IntoIterator<Item = (GlobalId, MapFilterProject)>,
1074>(
1075 session: &Session,
1076 catalog: &Catalog,
1077 storage_collections: &Arc<dyn StorageCollections<Timestamp = Timestamp> + Send + Sync>,
1078 as_of: Antichain<Timestamp>,
1079 mz_now: ResultSpec<'static>,
1080 imports: I,
1081) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
1082 let explain_timeout = *session.vars().statement_timeout();
1083 let mut futures = FuturesOrdered::new();
1084 for (id, mfp) in imports {
1085 let catalog_entry = catalog.get_entry_by_global_id(&id);
1086 let full_name = catalog
1087 .for_session(session)
1088 .resolve_full_name(&catalog_entry.name);
1089 let name = format!("{}", full_name);
1090 let relation_desc = catalog_entry
1091 .relation_desc()
1092 .expect("source should have a proper desc")
1093 .into_owned();
1094 let stats_future = storage_collections
1095 .snapshot_parts_stats(id, as_of.clone())
1096 .await;
1097
1098 let mz_now = mz_now.clone();
1099 futures.push_back(async move {
1104 let snapshot_stats = match stats_future.await {
1105 Ok(stats) => stats,
1106 Err(e) => return Err(e),
1107 };
1108 let mut total_bytes = 0;
1109 let mut total_parts = 0;
1110 let mut selected_bytes = 0;
1111 let mut selected_parts = 0;
1112 for SnapshotPartStats {
1113 encoded_size_bytes: bytes,
1114 stats,
1115 } in &snapshot_stats.parts
1116 {
1117 let bytes = u64::cast_from(*bytes);
1118 total_bytes += bytes;
1119 total_parts += 1u64;
1120 let selected = match stats {
1121 None => true,
1122 Some(stats) => {
1123 let stats = stats.decode();
1124 let stats = RelationPartStats::new(
1125 name.as_str(),
1126 &snapshot_stats.metrics.pushdown.part_stats,
1127 &relation_desc,
1128 &stats,
1129 );
1130 stats.may_match_mfp(mz_now.clone(), &mfp)
1131 }
1132 };
1133
1134 if selected {
1135 selected_bytes += bytes;
1136 selected_parts += 1u64;
1137 }
1138 }
1139 Ok(Row::pack_slice(&[
1140 name.as_str().into(),
1141 total_bytes.into(),
1142 selected_bytes.into(),
1143 total_parts.into(),
1144 selected_parts.into(),
1145 ]))
1146 });
1147 }
1148
1149 let fut = async move {
1150 match tokio::time::timeout(
1151 explain_timeout,
1152 futures::TryStreamExt::try_collect::<Vec<_>>(futures),
1153 )
1154 .await
1155 {
1156 Ok(Ok(rows)) => Ok(ExecuteResponse::SendingRowsImmediate {
1157 rows: Box::new(rows.into_row_iter()),
1158 }),
1159 Ok(Err(err)) => Err(err.into()),
1160 Err(_) => Err(AdapterError::StatementTimeout),
1161 }
1162 };
1163 fut
1164}
1165
1166pub(crate) async fn explain_plan_inner(
1169 session: &Session,
1170 catalog: &Catalog,
1171 df_meta: DataflowMetainfo,
1172 explain_ctx: ExplainPlanContext,
1173 optimizer: peek::Optimizer,
1174 insights_ctx: Option<Box<PlanInsightsContext>>,
1175) -> Result<Vec<Row>, AdapterError> {
1176 let ExplainPlanContext {
1177 config,
1178 format,
1179 stage,
1180 desc,
1181 optimizer_trace,
1182 ..
1183 } = explain_ctx;
1184
1185 let desc = desc.expect("RelationDesc for SelectPlan in EXPLAIN mode");
1186
1187 let session_catalog = catalog.for_session(session);
1188 let expr_humanizer = {
1189 let transient_items = btreemap! {
1190 optimizer.select_id() => TransientItem::new(
1191 Some(vec![GlobalId::Explain.to_string()]),
1192 Some(desc.iter_names().map(|c| c.to_string()).collect()),
1193 )
1194 };
1195 ExprHumanizerExt::new(transient_items, &session_catalog)
1196 };
1197
1198 let finishing = if optimizer.finishing().is_trivial(desc.arity()) {
1199 None
1200 } else {
1201 Some(optimizer.finishing().clone())
1202 };
1203
1204 let target_cluster = catalog.get_cluster(optimizer.cluster_id());
1205 let features = optimizer.config().features.clone();
1206
1207 let rows = optimizer_trace
1208 .into_rows(
1209 format,
1210 &config,
1211 &features,
1212 &expr_humanizer,
1213 finishing,
1214 Some(target_cluster),
1215 df_meta,
1216 stage,
1217 plan::ExplaineeStatementKind::Select,
1218 insights_ctx,
1219 )
1220 .await?;
1221
1222 Ok(rows)
1223}
1224
1225pub(crate) async fn statistics_oracle(
1230 session: &Session,
1231 source_ids: &BTreeSet<GlobalId>,
1232 query_as_of: &Antichain<Timestamp>,
1233 is_oneshot: bool,
1234 system_config: &vars::SystemVars,
1235 storage_collections: &dyn StorageCollections<Timestamp = Timestamp>,
1236) -> Result<Box<dyn StatisticsOracle>, AdapterError> {
1237 if !session.vars().enable_session_cardinality_estimates() {
1238 return Ok(Box::new(EmptyStatisticsOracle));
1239 }
1240
1241 let timeout = if is_oneshot {
1242 system_config.optimizer_oneshot_stats_timeout()
1244 } else {
1245 system_config.optimizer_stats_timeout()
1246 };
1247
1248 let cached_stats = mz_ore::future::timeout(
1249 timeout,
1250 CachedStatisticsOracle::new(source_ids, query_as_of, storage_collections),
1251 )
1252 .await;
1253
1254 match cached_stats {
1255 Ok(stats) => Ok(Box::new(stats)),
1256 Err(mz_ore::future::TimeoutError::DeadlineElapsed) => {
1257 warn!(
1258 is_oneshot = is_oneshot,
1259 "optimizer statistics collection timed out after {}ms",
1260 timeout.as_millis()
1261 );
1262
1263 Ok(Box::new(EmptyStatisticsOracle))
1264 }
1265 Err(mz_ore::future::TimeoutError::Inner(e)) => Err(AdapterError::Storage(e)),
1266 }
1267}
1268
1269#[derive(Debug)]
1270struct CachedStatisticsOracle {
1271 cache: BTreeMap<GlobalId, usize>,
1272}
1273
1274impl CachedStatisticsOracle {
1275 pub async fn new<T: TimelyTimestamp>(
1276 ids: &BTreeSet<GlobalId>,
1277 as_of: &Antichain<T>,
1278 storage_collections: &dyn StorageCollections<Timestamp = T>,
1279 ) -> Result<Self, StorageError<T>> {
1280 let mut cache = BTreeMap::new();
1281
1282 for id in ids {
1283 let stats = storage_collections.snapshot_stats(*id, as_of.clone()).await;
1284
1285 match stats {
1286 Ok(stats) => {
1287 cache.insert(*id, stats.num_updates);
1288 }
1289 Err(StorageError::IdentifierMissing(id)) => {
1290 ::tracing::debug!("no statistics for {id}")
1291 }
1292 Err(e) => return Err(e),
1293 }
1294 }
1295
1296 Ok(Self { cache })
1297 }
1298}
1299
1300impl StatisticsOracle for CachedStatisticsOracle {
1301 fn cardinality_estimate(&self, id: GlobalId) -> Option<usize> {
1302 self.cache.get(&id).map(|estimate| *estimate)
1303 }
1304
1305 fn as_map(&self) -> BTreeMap<GlobalId, usize> {
1306 self.cache.clone()
1307 }
1308}