1#![allow(clippy::pub_use)]
12
13use std::collections::{BTreeMap, BTreeSet};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use futures::FutureExt;
20use futures::future::LocalBoxFuture;
21use futures::stream::FuturesOrdered;
22use http::Uri;
23use inner::return_if_err;
24use maplit::btreemap;
25use mz_catalog::memory::objects::Cluster;
26use mz_controller_types::ReplicaId;
27use mz_expr::row::RowCollection;
28use mz_expr::{MapFilterProject, MirRelationExpr, ResultSpec, RowSetFinishing};
29use mz_ore::cast::CastFrom;
30use mz_ore::tracing::OpenTelemetryContext;
31use mz_persist_client::stats::SnapshotPartStats;
32use mz_repr::explain::{ExprHumanizerExt, TransientItem};
33use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, Row, RowArena, Timestamp};
34use mz_sql::catalog::{CatalogError, SessionCatalog};
35use mz_sql::names::ResolvedIds;
36use mz_sql::plan::{
37 self, AbortTransactionPlan, CommitTransactionPlan, CopyFromSource, CreateRolePlan,
38 CreateSourcePlanBundle, FetchPlan, HirScalarExpr, MutationKind, Params, Plan, PlanKind,
39 RaisePlan,
40};
41use mz_sql::rbac;
42use mz_sql::session::metadata::SessionMetadata;
43use mz_sql::session::vars;
44use mz_sql::session::vars::SessionVars;
45use mz_sql_parser::ast::{Raw, Statement};
46use mz_storage_client::client::TableData;
47use mz_storage_client::storage_collections::StorageCollections;
48use mz_storage_types::connections::inline::IntoInlineConnection;
49use mz_storage_types::controller::StorageError;
50use mz_storage_types::stats::RelationPartStats;
51use mz_transform::dataflow::DataflowMetainfo;
52use mz_transform::notice::{OptimizerNoticeApi, OptimizerNoticeKind, RawOptimizerNotice};
53use mz_transform::{EmptyStatisticsOracle, StatisticsOracle};
54use timely::progress::Antichain;
55use timely::progress::Timestamp as TimelyTimestamp;
56use tokio::sync::oneshot;
57use tracing::{Instrument, Level, Span, event, warn};
58
59use crate::ExecuteContext;
60use crate::catalog::{Catalog, CatalogState};
61use crate::command::{Command, ExecuteResponse, Response};
62use crate::coord::appends::{DeferredOp, DeferredPlan};
63use crate::coord::validity::PlanValidity;
64use crate::coord::{
65 Coordinator, DeferredPlanStatement, ExplainPlanContext, Message, PlanStatement, TargetCluster,
66 catalog_serving,
67};
68use crate::error::AdapterError;
69use crate::explain::insights::PlanInsightsContext;
70use crate::notice::AdapterNotice;
71use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
72use crate::optimize::peek;
73use crate::session::{
74 EndTransactionAction, Session, StateRevision, TransactionOps, TransactionStatus, WriteOp,
75};
76use crate::util::ClientTransmitter;
77
78mod inner;
98
99impl Coordinator {
100 pub(crate) fn sequence_plan(
104 &mut self,
105 mut ctx: ExecuteContext,
106 plan: Plan,
107 resolved_ids: ResolvedIds,
108 ) -> LocalBoxFuture<'_, ()> {
109 async move {
110 let responses = ExecuteResponse::generated_from(&PlanKind::from(&plan));
111 ctx.tx_mut().set_allowed(responses);
112
113 if self.controller.read_only() && !plan.allowed_in_read_only() {
114 ctx.retire(Err(AdapterError::ReadOnly));
115 return;
116 }
117
118 if let Some((dependencies, wait_future)) =
121 super::appends::waiting_on_startup_appends(self.catalog(), ctx.session_mut(), &plan)
122 {
123 let conn_id = ctx.session().conn_id();
124 tracing::debug!(%conn_id, "deferring plan for startup appends");
125
126 let role_metadata = ctx.session().role_metadata().clone();
127 let validity = PlanValidity::new(
128 self.catalog.transient_revision(),
129 dependencies,
130 None,
131 None,
132 role_metadata,
133 );
134 let deferred_plan = DeferredPlan {
135 ctx,
136 plan,
137 validity,
138 requires_locks: BTreeSet::default(),
139 };
140 let acquire_future = wait_future.map(|()| None);
143
144 self.defer_op(acquire_future, DeferredOp::Plan(deferred_plan));
145
146 return;
149 };
150
151 let target_cluster = match ctx.session().transaction().cluster() {
153 Some(cluster_id) => TargetCluster::Transaction(cluster_id),
155 None => {
157 let session_catalog = self.catalog.for_session(ctx.session());
158 catalog_serving::auto_run_on_catalog_server(
159 &session_catalog,
160 ctx.session(),
161 &plan,
162 )
163 }
164 };
165 let (target_cluster_id, target_cluster_name) = match self
166 .catalog()
167 .resolve_target_cluster(target_cluster, ctx.session())
168 {
169 Ok(cluster) => (Some(cluster.id), Some(cluster.name.clone())),
170 Err(_) => (None, None),
171 };
172
173 if let (Some(cluster_id), Some(statement_id)) =
174 (target_cluster_id, ctx.extra().contents())
175 {
176 self.set_statement_execution_cluster(statement_id, cluster_id);
177 }
178
179 let session_catalog = self.catalog.for_session(ctx.session());
180
181 if let Some(cluster_name) = &target_cluster_name {
182 if let Err(e) = catalog_serving::check_cluster_restrictions(
183 cluster_name,
184 &session_catalog,
185 &plan,
186 ) {
187 return ctx.retire(Err(e));
188 }
189 }
190
191 if let Err(e) = rbac::check_plan(
192 &session_catalog,
193 Some(|id| {
194 self.active_conns()
197 .into_iter()
198 .find(|(conn_id, _)| conn_id.unhandled() == id)
199 .map(|(_, conn_meta)| *conn_meta.authenticated_role_id())
200 }),
201 ctx.session(),
202 &plan,
203 target_cluster_id,
204 &resolved_ids,
205 ) {
206 return ctx.retire(Err(e.into()));
207 }
208
209 match plan {
210 Plan::CreateSource(plan) => {
211 let id_ts = self.get_catalog_write_ts().await;
212 let (item_id, global_id) =
213 return_if_err!(self.catalog().allocate_user_id(id_ts).await, ctx);
214 let result = self
215 .sequence_create_source(
216 &mut ctx,
217 vec![CreateSourcePlanBundle {
218 item_id,
219 global_id,
220 plan,
221 resolved_ids,
222 available_source_references: None,
223 }],
224 )
225 .await;
226 ctx.retire(result);
227 }
228 Plan::CreateSources(plans) => {
229 assert!(
230 resolved_ids.is_empty(),
231 "each plan has separate resolved_ids"
232 );
233 let result = self.sequence_create_source(&mut ctx, plans).await;
234 ctx.retire(result);
235 }
236 Plan::CreateConnection(plan) => {
237 self.sequence_create_connection(ctx, plan, resolved_ids)
238 .await;
239 }
240 Plan::CreateDatabase(plan) => {
241 let result = self.sequence_create_database(ctx.session_mut(), plan).await;
242 ctx.retire(result);
243 }
244 Plan::CreateSchema(plan) => {
245 let result = self.sequence_create_schema(ctx.session_mut(), plan).await;
246 ctx.retire(result);
247 }
248 Plan::CreateRole(plan) => {
249 let result = self
250 .sequence_create_role(Some(ctx.session().conn_id()), plan)
251 .await;
252 if let Some(notice) = self.should_emit_rbac_notice(ctx.session()) {
253 ctx.session().add_notice(notice);
254 }
255 ctx.retire(result);
256 }
257 Plan::CreateCluster(plan) => {
258 let result = self.sequence_create_cluster(ctx.session(), plan).await;
259 ctx.retire(result);
260 }
261 Plan::CreateClusterReplica(plan) => {
262 let result = self
263 .sequence_create_cluster_replica(ctx.session(), plan)
264 .await;
265 ctx.retire(result);
266 }
267 Plan::CreateTable(plan) => {
268 let result = self
269 .sequence_create_table(&mut ctx, plan, resolved_ids)
270 .await;
271 ctx.retire(result);
272 }
273 Plan::CreateSecret(plan) => {
274 self.sequence_create_secret(ctx, plan).await;
275 }
276 Plan::CreateSink(plan) => {
277 self.sequence_create_sink(ctx, plan, resolved_ids).await;
278 }
279 Plan::CreateView(plan) => {
280 self.sequence_create_view(ctx, plan, resolved_ids).await;
281 }
282 Plan::CreateMaterializedView(plan) => {
283 self.sequence_create_materialized_view(ctx, plan, resolved_ids)
284 .await;
285 }
286 Plan::CreateContinualTask(plan) => {
287 let res = self
288 .sequence_create_continual_task(&mut ctx, plan, resolved_ids)
289 .await;
290 ctx.retire(res);
291 }
292 Plan::CreateIndex(plan) => {
293 self.sequence_create_index(ctx, plan, resolved_ids).await;
294 }
295 Plan::CreateType(plan) => {
296 let result = self
297 .sequence_create_type(ctx.session(), plan, resolved_ids)
298 .await;
299 ctx.retire(result);
300 }
301 Plan::CreateNetworkPolicy(plan) => {
302 let res = self
303 .sequence_create_network_policy(ctx.session(), plan)
304 .await;
305 ctx.retire(res);
306 }
307 Plan::Comment(plan) => {
308 let result = self.sequence_comment_on(ctx.session(), plan).await;
309 ctx.retire(result);
310 }
311 Plan::CopyTo(plan) => {
312 self.sequence_copy_to(ctx, plan, target_cluster).await;
313 }
314 Plan::DropObjects(plan) => {
315 let result = self.sequence_drop_objects(&mut ctx, plan).await;
316 ctx.retire(result);
317 }
318 Plan::DropOwned(plan) => {
319 let result = self.sequence_drop_owned(ctx.session_mut(), plan).await;
320 ctx.retire(result);
321 }
322 Plan::EmptyQuery => {
323 ctx.retire(Ok(ExecuteResponse::EmptyQuery));
324 }
325 Plan::ShowAllVariables => {
326 let result = self.sequence_show_all_variables(ctx.session());
327 ctx.retire(result);
328 }
329 Plan::ShowVariable(plan) => {
330 let result = self.sequence_show_variable(ctx.session(), plan);
331 ctx.retire(result);
332 }
333 Plan::InspectShard(plan) => {
334 let result = self.sequence_inspect_shard(ctx.session(), plan).await;
336 ctx.retire(result);
337 }
338 Plan::SetVariable(plan) => {
339 let result = self.sequence_set_variable(ctx.session_mut(), plan);
340 ctx.retire(result);
341 }
342 Plan::ResetVariable(plan) => {
343 let result = self.sequence_reset_variable(ctx.session_mut(), plan);
344 ctx.retire(result);
345 }
346 Plan::SetTransaction(plan) => {
347 let result = self.sequence_set_transaction(ctx.session_mut(), plan);
348 ctx.retire(result);
349 }
350 Plan::StartTransaction(plan) => {
351 if matches!(
352 ctx.session().transaction(),
353 TransactionStatus::InTransaction(_)
354 ) {
355 ctx.session()
356 .add_notice(AdapterNotice::ExistingTransactionInProgress);
357 }
358 let result = ctx.session_mut().start_transaction(
359 self.now_datetime(),
360 plan.access,
361 plan.isolation_level,
362 );
363 ctx.retire(result.map(|_| ExecuteResponse::StartedTransaction))
364 }
365 Plan::CommitTransaction(CommitTransactionPlan {
366 ref transaction_type,
367 })
368 | Plan::AbortTransaction(AbortTransactionPlan {
369 ref transaction_type,
370 }) => {
371 if ctx.session().transaction().is_ddl() {
374 if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
375 let prev = self
376 .active_conns
377 .get_mut(ctx.session().conn_id())
378 .expect("connection must exist")
379 .deferred_lock
380 .replace(guard);
381 assert!(
382 prev.is_none(),
383 "connections should have at most one lock guard"
384 );
385 } else {
386 self.serialized_ddl.push_back(DeferredPlanStatement {
387 ctx,
388 ps: PlanStatement::Plan { plan, resolved_ids },
389 });
390 return;
391 }
392 }
393
394 let action = match &plan {
395 Plan::CommitTransaction(_) => EndTransactionAction::Commit,
396 Plan::AbortTransaction(_) => EndTransactionAction::Rollback,
397 _ => unreachable!(),
398 };
399 if ctx.session().transaction().is_implicit() && !transaction_type.is_implicit()
400 {
401 ctx.session().add_notice(
406 AdapterNotice::ExplicitTransactionControlInImplicitTransaction,
407 );
408 }
409 self.sequence_end_transaction(ctx, action).await;
410 }
411 Plan::Select(plan) => {
412 let max = Some(ctx.session().vars().max_query_result_size());
413 self.sequence_peek(ctx, plan, target_cluster, max).await;
414 }
415 Plan::Subscribe(plan) => {
416 self.sequence_subscribe(ctx, plan, target_cluster).await;
417 }
418 Plan::SideEffectingFunc(plan) => {
419 self.sequence_side_effecting_func(ctx, plan).await;
420 }
421 Plan::ShowCreate(plan) => {
422 ctx.retire(Ok(Self::send_immediate_rows(plan.row)));
423 }
424 Plan::ShowColumns(show_columns_plan) => {
425 let max = Some(ctx.session().vars().max_query_result_size());
426 self.sequence_peek(ctx, show_columns_plan.select_plan, target_cluster, max)
427 .await;
428 }
429 Plan::CopyFrom(plan) => match plan.source {
430 CopyFromSource::Stdin => {
431 let (tx, _, session, ctx_extra) = ctx.into_parts();
432 tx.send(
433 Ok(ExecuteResponse::CopyFrom {
434 target_id: plan.target_id,
435 target_name: plan.target_name,
436 columns: plan.columns,
437 params: plan.params,
438 ctx_extra,
439 }),
440 session,
441 );
442 }
443 CopyFromSource::Url(_) | CopyFromSource::AwsS3 { .. } => {
444 self.sequence_copy_from(ctx, plan, target_cluster).await;
445 }
446 },
447 Plan::ExplainPlan(plan) => {
448 self.sequence_explain_plan(ctx, plan, target_cluster).await;
449 }
450 Plan::ExplainPushdown(plan) => {
451 self.sequence_explain_pushdown(ctx, plan, target_cluster)
452 .await;
453 }
454 Plan::ExplainSinkSchema(plan) => {
455 let result = self.sequence_explain_schema(plan);
456 ctx.retire(result);
457 }
458 Plan::ExplainTimestamp(plan) => {
459 self.sequence_explain_timestamp(ctx, plan, target_cluster)
460 .await;
461 }
462 Plan::Insert(plan) => {
463 self.sequence_insert(ctx, plan).await;
464 }
465 Plan::ReadThenWrite(plan) => {
466 self.sequence_read_then_write(ctx, plan).await;
467 }
468 Plan::AlterNoop(plan) => {
469 ctx.retire(Ok(ExecuteResponse::AlteredObject(plan.object_type)));
470 }
471 Plan::AlterCluster(plan) => {
472 self.sequence_alter_cluster_staged(ctx, plan).await;
473 }
474 Plan::AlterClusterRename(plan) => {
475 let result = self.sequence_alter_cluster_rename(&mut ctx, plan).await;
476 ctx.retire(result);
477 }
478 Plan::AlterClusterSwap(plan) => {
479 let result = self.sequence_alter_cluster_swap(&mut ctx, plan).await;
480 ctx.retire(result);
481 }
482 Plan::AlterClusterReplicaRename(plan) => {
483 let result = self
484 .sequence_alter_cluster_replica_rename(ctx.session(), plan)
485 .await;
486 ctx.retire(result);
487 }
488 Plan::AlterConnection(plan) => {
489 self.sequence_alter_connection(ctx, plan).await;
490 }
491 Plan::AlterSetCluster(plan) => {
492 let result = self.sequence_alter_set_cluster(ctx.session(), plan).await;
493 ctx.retire(result);
494 }
495 Plan::AlterRetainHistory(plan) => {
496 let result = self.sequence_alter_retain_history(&mut ctx, plan).await;
497 ctx.retire(result);
498 }
499 Plan::AlterSourceTimestampInterval(plan) => {
500 let result = self
501 .sequence_alter_source_timestamp_interval(&mut ctx, plan)
502 .await;
503 ctx.retire(result);
504 }
505 Plan::AlterItemRename(plan) => {
506 let result = self.sequence_alter_item_rename(&mut ctx, plan).await;
507 ctx.retire(result);
508 }
509 Plan::AlterSchemaRename(plan) => {
510 let result = self.sequence_alter_schema_rename(&mut ctx, plan).await;
511 ctx.retire(result);
512 }
513 Plan::AlterSchemaSwap(plan) => {
514 let result = self.sequence_alter_schema_swap(&mut ctx, plan).await;
515 ctx.retire(result);
516 }
517 Plan::AlterRole(plan) => {
518 let result = self.sequence_alter_role(ctx.session_mut(), plan).await;
519 ctx.retire(result);
520 }
521 Plan::AlterSecret(plan) => {
522 self.sequence_alter_secret(ctx, plan).await;
523 }
524 Plan::AlterSink(plan) => {
525 self.sequence_alter_sink_prepare(ctx, plan).await;
526 }
527 Plan::AlterSource(plan) => {
528 let result = self.sequence_alter_source(ctx.session_mut(), plan).await;
529 ctx.retire(result);
530 }
531 Plan::AlterSystemSet(plan) => {
532 let result = self.sequence_alter_system_set(ctx.session(), plan).await;
533 ctx.retire(result);
534 }
535 Plan::AlterSystemReset(plan) => {
536 let result = self.sequence_alter_system_reset(ctx.session(), plan).await;
537 ctx.retire(result);
538 }
539 Plan::AlterSystemResetAll(plan) => {
540 let result = self
541 .sequence_alter_system_reset_all(ctx.session(), plan)
542 .await;
543 ctx.retire(result);
544 }
545 Plan::AlterTableAddColumn(plan) => {
546 let result = self.sequence_alter_table(&mut ctx, plan).await;
547 ctx.retire(result);
548 }
549 Plan::AlterMaterializedViewApplyReplacement(plan) => {
550 self.sequence_alter_materialized_view_apply_replacement_prepare(ctx, plan)
551 .await;
552 }
553 Plan::AlterNetworkPolicy(plan) => {
554 let res = self
555 .sequence_alter_network_policy(ctx.session(), plan)
556 .await;
557 ctx.retire(res);
558 }
559 Plan::DiscardTemp => {
560 self.drop_temp_items(ctx.session().conn_id()).await;
561 ctx.retire(Ok(ExecuteResponse::DiscardedTemp));
562 }
563 Plan::DiscardAll => {
564 let ret = if let TransactionStatus::Started(_) = ctx.session().transaction() {
565 self.clear_transaction(ctx.session_mut()).await;
566 self.drop_temp_items(ctx.session().conn_id()).await;
567 ctx.session_mut().reset();
568 Ok(ExecuteResponse::DiscardedAll)
569 } else {
570 Err(AdapterError::OperationProhibitsTransaction(
571 "DISCARD ALL".into(),
572 ))
573 };
574 ctx.retire(ret);
575 }
576 Plan::Declare(plan) => {
577 self.declare(ctx, plan.name, plan.stmt, plan.sql, plan.params);
578 }
579 Plan::Fetch(FetchPlan {
580 name,
581 count,
582 timeout,
583 }) => {
584 let ctx_extra = std::mem::take(ctx.extra_mut());
585 ctx.retire(Ok(ExecuteResponse::Fetch {
586 name,
587 count,
588 timeout,
589 ctx_extra,
590 }));
591 }
592 Plan::Close(plan) => {
593 if ctx.session_mut().remove_portal(&plan.name) {
594 ctx.retire(Ok(ExecuteResponse::ClosedCursor));
595 } else {
596 ctx.retire(Err(AdapterError::UnknownCursor(plan.name)));
597 }
598 }
599 Plan::Prepare(plan) => {
600 if ctx
601 .session()
602 .get_prepared_statement_unverified(&plan.name)
603 .is_some()
604 {
605 ctx.retire(Err(AdapterError::PreparedStatementExists(plan.name)));
606 } else {
607 let state_revision = StateRevision {
608 catalog_revision: self.catalog().transient_revision(),
609 session_state_revision: ctx.session().state_revision(),
610 };
611 ctx.session_mut().set_prepared_statement(
612 plan.name,
613 Some(plan.stmt),
614 plan.sql,
615 plan.desc,
616 state_revision,
617 self.now(),
618 );
619 ctx.retire(Ok(ExecuteResponse::Prepare));
620 }
621 }
622 Plan::Execute(plan) => {
623 match self.sequence_execute(ctx.session_mut(), plan) {
624 Ok(portal_name) => {
625 let (tx, _, session, extra) = ctx.into_parts();
626 self.internal_cmd_tx
627 .send(Message::Command(
628 OpenTelemetryContext::obtain(),
629 Command::Execute {
630 portal_name,
631 session,
632 tx: tx.take(),
633 outer_ctx_extra: Some(extra),
634 },
635 ))
636 .expect("sending to self.internal_cmd_tx cannot fail");
637 }
638 Err(err) => ctx.retire(Err(err)),
639 };
640 }
641 Plan::Deallocate(plan) => match plan.name {
642 Some(name) => {
643 if ctx.session_mut().remove_prepared_statement(&name) {
644 ctx.retire(Ok(ExecuteResponse::Deallocate { all: false }));
645 } else {
646 ctx.retire(Err(AdapterError::UnknownPreparedStatement(name)));
647 }
648 }
649 None => {
650 ctx.session_mut().remove_all_prepared_statements();
651 ctx.retire(Ok(ExecuteResponse::Deallocate { all: true }));
652 }
653 },
654 Plan::Raise(RaisePlan { severity }) => {
655 ctx.session()
656 .add_notice(AdapterNotice::UserRequested { severity });
657 ctx.retire(Ok(ExecuteResponse::Raised));
658 }
659 Plan::GrantPrivileges(plan) => {
660 let result = self
661 .sequence_grant_privileges(ctx.session_mut(), plan)
662 .await;
663 ctx.retire(result);
664 }
665 Plan::RevokePrivileges(plan) => {
666 let result = self
667 .sequence_revoke_privileges(ctx.session_mut(), plan)
668 .await;
669 ctx.retire(result);
670 }
671 Plan::AlterDefaultPrivileges(plan) => {
672 let result = self
673 .sequence_alter_default_privileges(ctx.session_mut(), plan)
674 .await;
675 ctx.retire(result);
676 }
677 Plan::GrantRole(plan) => {
678 let result = self.sequence_grant_role(ctx.session_mut(), plan).await;
679 ctx.retire(result);
680 }
681 Plan::RevokeRole(plan) => {
682 let result = self.sequence_revoke_role(ctx.session_mut(), plan).await;
683 ctx.retire(result);
684 }
685 Plan::AlterOwner(plan) => {
686 let result = self.sequence_alter_owner(ctx.session_mut(), plan).await;
687 ctx.retire(result);
688 }
689 Plan::ReassignOwned(plan) => {
690 let result = self.sequence_reassign_owned(ctx.session_mut(), plan).await;
691 ctx.retire(result);
692 }
693 Plan::ValidateConnection(plan) => {
694 let connection = plan
695 .connection
696 .into_inline_connection(self.catalog().state());
697 let current_storage_configuration = self.controller.storage.config().clone();
698 mz_ore::task::spawn(|| "coord::validate_connection", async move {
699 let res = match connection
700 .validate(plan.id, ¤t_storage_configuration)
701 .await
702 {
703 Ok(()) => Ok(ExecuteResponse::ValidatedConnection),
704 Err(err) => Err(err.into()),
705 };
706 ctx.retire(res);
707 });
708 }
709 }
710 }
711 .instrument(tracing::debug_span!("coord::sequencer::sequence_plan"))
712 .boxed_local()
713 }
714
715 #[mz_ore::instrument(level = "debug")]
716 pub(crate) async fn sequence_execute_single_statement_transaction(
717 &mut self,
718 ctx: ExecuteContext,
719 stmt: Arc<Statement<Raw>>,
720 params: Params,
721 ) {
722 let (tx, internal_cmd_tx, mut session, extra) = ctx.into_parts();
724 assert!(matches!(session.transaction(), TransactionStatus::Default));
725 session.start_transaction_single_stmt(self.now_datetime());
726 let conn_id = session.conn_id().unhandled();
727
728 let (sub_tx, sub_rx) = oneshot::channel();
730 let sub_ct = ClientTransmitter::new(sub_tx, self.internal_cmd_tx.clone());
731 let sub_ctx = ExecuteContext::from_parts(sub_ct, internal_cmd_tx, session, extra);
732 self.handle_execute_inner(stmt, params, sub_ctx).await;
733
734 let internal_cmd_tx = self.internal_cmd_tx.clone();
737 mz_ore::task::spawn(
738 || format!("execute_single_statement:{conn_id}"),
739 async move {
740 let Ok(Response {
741 result,
742 session,
743 otel_ctx,
744 }) = sub_rx.await
745 else {
746 return;
748 };
749 otel_ctx.attach_as_parent();
750 let (sub_tx, sub_rx) = oneshot::channel();
751 let _ = internal_cmd_tx.send(Message::Command(
752 otel_ctx,
753 Command::Commit {
754 action: EndTransactionAction::Commit,
755 session,
756 tx: sub_tx,
757 },
758 ));
759 let Ok(commit_response) = sub_rx.await else {
760 return;
762 };
763 assert!(matches!(
764 commit_response.session.transaction(),
765 TransactionStatus::Default
766 ));
767 let result = match (result, commit_response.result) {
773 (Ok(_), commit) => commit,
774 (Err(result), _) => Err(result),
775 };
776 tx.send(result, commit_response.session);
779 }
780 .instrument(Span::current()),
781 );
782 }
783
784 #[mz_ore::instrument(level = "debug")]
788 pub(crate) async fn sequence_create_role_for_startup(
789 &mut self,
790 plan: CreateRolePlan,
791 ) -> Result<ExecuteResponse, AdapterError> {
792 self.sequence_create_role(None, plan).await
799 }
800
801 pub(crate) fn allocate_transient_id(&self) -> (CatalogItemId, GlobalId) {
802 self.transient_id_gen.allocate_id()
803 }
804
805 fn should_emit_rbac_notice(&self, session: &Session) -> Option<AdapterNotice> {
806 if !rbac::is_rbac_enabled_for_session(self.catalog.system_config(), session) {
807 Some(AdapterNotice::RbacUserDisabled)
808 } else {
809 None
810 }
811 }
812
813 pub(crate) fn insert_constant(
820 catalog: &Catalog,
821 session: &mut Session,
822 target_id: CatalogItemId,
823 constants: MirRelationExpr,
824 ) -> Result<ExecuteResponse, AdapterError> {
825 let desc = match catalog.try_get_entry(&target_id) {
827 Some(table) => {
828 table.relation_desc_latest().expect("table has desc")
830 }
831 None => {
832 return Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
833 kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
834 target_id.to_string(),
835 )),
836 }));
837 }
838 };
839
840 match constants.as_const() {
841 Some((rows, ..)) => {
842 let rows = rows.clone()?;
843 for (row, _) in &rows {
844 for (i, datum) in row.iter().enumerate() {
845 desc.constraints_met(i, &datum)?;
846 }
847 }
848 let diffs_plan = plan::SendDiffsPlan {
849 id: target_id,
850 updates: rows,
851 kind: MutationKind::Insert,
852 returning: Vec::new(),
853 max_result_size: catalog.system_config().max_result_size(),
854 };
855 Self::send_diffs(session, diffs_plan)
856 }
857 None => panic!(
858 "tried using sequence_insert_constant on non-constant MirRelationExpr\n{}",
859 constants.pretty(),
860 ),
861 }
862 }
863
864 #[mz_ore::instrument(level = "debug")]
865 pub(crate) fn send_diffs(
866 session: &mut Session,
867 mut plan: plan::SendDiffsPlan,
868 ) -> Result<ExecuteResponse, AdapterError> {
869 let affected_rows = {
870 let mut affected_rows = Diff::from(0);
871 let mut all_positive_diffs = true;
872 for (_, diff) in plan.updates.iter() {
875 if diff.is_negative() {
876 all_positive_diffs = false;
877 break;
878 }
879
880 affected_rows += diff;
881 }
882
883 if !all_positive_diffs {
884 differential_dataflow::consolidation::consolidate(&mut plan.updates);
888
889 affected_rows = Diff::ZERO;
890 for (_, diff) in plan.updates.iter() {
895 affected_rows += diff.abs();
896 }
897 }
898
899 usize::try_from(affected_rows.into_inner()).expect("positive Diff must fit")
900 };
901 event!(
902 Level::TRACE,
903 affected_rows,
904 id = format!("{:?}", plan.id),
905 kind = format!("{:?}", plan.kind),
906 updates = plan.updates.len(),
907 returning = plan.returning.len(),
908 );
909
910 session.add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
911 id: plan.id,
912 rows: TableData::Rows(plan.updates),
913 }]))?;
914 if !plan.returning.is_empty() {
915 let finishing = RowSetFinishing {
916 order_by: Vec::new(),
917 limit: None,
918 offset: 0,
919 project: (0..plan.returning[0].0.iter().count()).collect(),
920 };
921 let max_returned_query_size = session.vars().max_query_result_size();
922 let duration_histogram = session.metrics().row_set_finishing_seconds();
923
924 return match finishing.finish(
925 RowCollection::new(plan.returning, &finishing.order_by),
926 plan.max_result_size,
927 Some(max_returned_query_size),
928 duration_histogram,
929 ) {
930 Ok((rows, _size_bytes)) => Ok(Self::send_immediate_rows(rows)),
931 Err(e) => Err(AdapterError::ResultSize(e)),
932 };
933 }
934 Ok(match plan.kind {
935 MutationKind::Delete => ExecuteResponse::Deleted(affected_rows),
936 MutationKind::Insert => ExecuteResponse::Inserted(affected_rows),
937 MutationKind::Update => ExecuteResponse::Updated(affected_rows / 2),
938 })
939 }
940}
941
942pub(crate) fn check_log_reads(
951 catalog: &Catalog,
952 cluster: &Cluster,
953 source_ids: &BTreeSet<GlobalId>,
954 target_replica: &mut Option<ReplicaId>,
955 vars: &SessionVars,
956) -> Result<impl IntoIterator<Item = AdapterNotice>, AdapterError>
957where
958{
959 let log_names = source_ids
960 .iter()
961 .map(|gid| catalog.resolve_item_id(gid))
962 .flat_map(|item_id| catalog.introspection_dependencies(item_id))
963 .map(|item_id| catalog.get_entry(&item_id).name().item.clone())
964 .collect::<Vec<_>>();
965
966 if log_names.is_empty() {
967 return Ok(None);
968 }
969
970 let num_replicas = cluster.replicas().count();
974 if target_replica.is_none() {
975 if num_replicas == 1 {
976 *target_replica = cluster.replicas().map(|r| r.replica_id).next();
977 } else {
978 return Err(AdapterError::UntargetedLogRead { log_names });
979 }
980 }
981
982 let replica_id = target_replica.expect("set to `Some` above");
985 let replica = &cluster.replica(replica_id).expect("Replica must exist");
986 if !replica.config.compute.logging.enabled() {
987 return Err(AdapterError::IntrospectionDisabled { log_names });
988 }
989
990 Ok(vars
991 .emit_introspection_query_notice()
992 .then_some(AdapterNotice::PerReplicaLogRead { log_names }))
993}
994
995pub(crate) fn emit_optimizer_notices(
997 catalog: &Catalog,
998 session: &Session,
999 notices: &Vec<RawOptimizerNotice>,
1000) {
1001 if notices.is_empty() {
1003 return;
1004 }
1005 let humanizer = catalog.for_session(session);
1006 let system_vars = catalog.system_config();
1007 for notice in notices {
1008 let kind = OptimizerNoticeKind::from(notice);
1009 let notice_enabled = match kind {
1010 OptimizerNoticeKind::EqualsNull => system_vars.enable_notices_for_equals_null(),
1011 OptimizerNoticeKind::IndexAlreadyExists => {
1012 system_vars.enable_notices_for_index_already_exists()
1013 }
1014 OptimizerNoticeKind::IndexTooWideForLiteralConstraints => {
1015 system_vars.enable_notices_for_index_too_wide_for_literal_constraints()
1016 }
1017 OptimizerNoticeKind::IndexKeyEmpty => system_vars.enable_notices_for_index_empty_key(),
1018 };
1019 if notice_enabled {
1020 session.add_notice(AdapterNotice::OptimizerNotice {
1024 notice: notice.message(&humanizer, false).to_string(),
1025 hint: notice.hint(&humanizer, false).to_string(),
1026 });
1027 }
1028 session
1029 .metrics()
1030 .optimization_notices(&[kind.metric_label()])
1031 .inc_by(1);
1032 }
1033}
1034
1035pub fn eval_copy_to_uri(
1040 to: HirScalarExpr,
1041 session: &Session,
1042 catalog_state: &CatalogState,
1043) -> Result<Uri, AdapterError> {
1044 let style = ExprPrepOneShot {
1045 logical_time: EvalTime::NotAvailable,
1046 session,
1047 catalog_state,
1048 };
1049 let mut to = to.lower_uncorrelated(catalog_state.system_config())?;
1050 style.prep_scalar_expr(&mut to)?;
1051 let temp_storage = RowArena::new();
1052 let evaled = to.eval(&[], &temp_storage)?;
1053 if evaled == Datum::Null {
1054 coord_bail!("COPY TO target value can not be null");
1055 }
1056 let to_url = match Uri::from_str(evaled.unwrap_str()) {
1057 Ok(url) => {
1058 if url.scheme_str() != Some("s3") {
1059 coord_bail!("only 's3://...' urls are supported as COPY TO target");
1060 }
1061 url
1062 }
1063 Err(e) => coord_bail!("could not parse COPY TO target url: {}", e),
1064 };
1065 Ok(to_url)
1066}
1067
1068pub(crate) async fn explain_pushdown_future_inner<
1075 I: IntoIterator<Item = (GlobalId, MapFilterProject)>,
1076>(
1077 session: &Session,
1078 catalog: &Catalog,
1079 storage_collections: &Arc<dyn StorageCollections<Timestamp = Timestamp> + Send + Sync>,
1080 as_of: Antichain<Timestamp>,
1081 mz_now: ResultSpec<'static>,
1082 imports: I,
1083) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
1084 let explain_timeout = *session.vars().statement_timeout();
1085 let mut futures = FuturesOrdered::new();
1086 for (id, mfp) in imports {
1087 let catalog_entry = catalog.get_entry_by_global_id(&id);
1088 let full_name = catalog
1089 .for_session(session)
1090 .resolve_full_name(&catalog_entry.name);
1091 let name = format!("{}", full_name);
1092 let relation_desc = catalog_entry
1093 .relation_desc()
1094 .expect("source should have a proper desc")
1095 .into_owned();
1096 let stats_future = storage_collections
1097 .snapshot_parts_stats(id, as_of.clone())
1098 .await;
1099
1100 let mz_now = mz_now.clone();
1101 futures.push_back(async move {
1106 let snapshot_stats = match stats_future.await {
1107 Ok(stats) => stats,
1108 Err(e) => return Err(e),
1109 };
1110 let mut total_bytes = 0;
1111 let mut total_parts = 0;
1112 let mut selected_bytes = 0;
1113 let mut selected_parts = 0;
1114 for SnapshotPartStats {
1115 encoded_size_bytes: bytes,
1116 stats,
1117 } in &snapshot_stats.parts
1118 {
1119 let bytes = u64::cast_from(*bytes);
1120 total_bytes += bytes;
1121 total_parts += 1u64;
1122 let selected = match stats {
1123 None => true,
1124 Some(stats) => {
1125 let stats = stats.decode();
1126 let stats = RelationPartStats::new(
1127 name.as_str(),
1128 &snapshot_stats.metrics.pushdown.part_stats,
1129 &relation_desc,
1130 &stats,
1131 );
1132 stats.may_match_mfp(mz_now.clone(), &mfp)
1133 }
1134 };
1135
1136 if selected {
1137 selected_bytes += bytes;
1138 selected_parts += 1u64;
1139 }
1140 }
1141 Ok(Row::pack_slice(&[
1142 name.as_str().into(),
1143 total_bytes.into(),
1144 selected_bytes.into(),
1145 total_parts.into(),
1146 selected_parts.into(),
1147 ]))
1148 });
1149 }
1150
1151 let fut = async move {
1152 match tokio::time::timeout(
1153 explain_timeout,
1154 futures::TryStreamExt::try_collect::<Vec<_>>(futures),
1155 )
1156 .await
1157 {
1158 Ok(Ok(rows)) => Ok(ExecuteResponse::SendingRowsImmediate {
1159 rows: Box::new(rows.into_row_iter()),
1160 }),
1161 Ok(Err(err)) => Err(err.into()),
1162 Err(_) => Err(AdapterError::StatementTimeout),
1163 }
1164 };
1165 fut
1166}
1167
1168pub(crate) async fn explain_plan_inner(
1171 session: &Session,
1172 catalog: &Catalog,
1173 df_meta: DataflowMetainfo,
1174 explain_ctx: ExplainPlanContext,
1175 optimizer: peek::Optimizer,
1176 insights_ctx: Option<Box<PlanInsightsContext>>,
1177) -> Result<Vec<Row>, AdapterError> {
1178 let ExplainPlanContext {
1179 config,
1180 format,
1181 stage,
1182 desc,
1183 optimizer_trace,
1184 ..
1185 } = explain_ctx;
1186
1187 let desc = desc.expect("RelationDesc for SelectPlan in EXPLAIN mode");
1188
1189 let session_catalog = catalog.for_session(session);
1190 let expr_humanizer = {
1191 let transient_items = btreemap! {
1192 optimizer.select_id() => TransientItem::new(
1193 Some(vec![GlobalId::Explain.to_string()]),
1194 Some(desc.iter_names().map(|c| c.to_string()).collect()),
1195 )
1196 };
1197 ExprHumanizerExt::new(transient_items, &session_catalog)
1198 };
1199
1200 let finishing = if optimizer.finishing().is_trivial(desc.arity()) {
1201 None
1202 } else {
1203 Some(optimizer.finishing().clone())
1204 };
1205
1206 let target_cluster = catalog.get_cluster(optimizer.cluster_id());
1207 let features = optimizer.config().features.clone();
1208
1209 let rows = optimizer_trace
1210 .into_rows(
1211 format,
1212 &config,
1213 &features,
1214 &expr_humanizer,
1215 finishing,
1216 Some(target_cluster),
1217 df_meta,
1218 stage,
1219 plan::ExplaineeStatementKind::Select,
1220 insights_ctx,
1221 )
1222 .await?;
1223
1224 Ok(rows)
1225}
1226
1227pub(crate) async fn statistics_oracle(
1232 session: &Session,
1233 source_ids: &BTreeSet<GlobalId>,
1234 query_as_of: &Antichain<Timestamp>,
1235 is_oneshot: bool,
1236 system_config: &vars::SystemVars,
1237 storage_collections: &dyn StorageCollections<Timestamp = Timestamp>,
1238) -> Result<Box<dyn StatisticsOracle>, AdapterError> {
1239 if !session.vars().enable_session_cardinality_estimates() {
1240 return Ok(Box::new(EmptyStatisticsOracle));
1241 }
1242
1243 let timeout = if is_oneshot {
1244 system_config.optimizer_oneshot_stats_timeout()
1246 } else {
1247 system_config.optimizer_stats_timeout()
1248 };
1249
1250 let cached_stats = mz_ore::future::timeout(
1251 timeout,
1252 CachedStatisticsOracle::new(source_ids, query_as_of, storage_collections),
1253 )
1254 .await;
1255
1256 match cached_stats {
1257 Ok(stats) => Ok(Box::new(stats)),
1258 Err(mz_ore::future::TimeoutError::DeadlineElapsed) => {
1259 warn!(
1260 is_oneshot = is_oneshot,
1261 "optimizer statistics collection timed out after {}ms",
1262 timeout.as_millis()
1263 );
1264
1265 Ok(Box::new(EmptyStatisticsOracle))
1266 }
1267 Err(mz_ore::future::TimeoutError::Inner(e)) => Err(AdapterError::Storage(e)),
1268 }
1269}
1270
1271#[derive(Debug)]
1272struct CachedStatisticsOracle {
1273 cache: BTreeMap<GlobalId, usize>,
1274}
1275
1276impl CachedStatisticsOracle {
1277 pub async fn new<T: TimelyTimestamp>(
1278 ids: &BTreeSet<GlobalId>,
1279 as_of: &Antichain<T>,
1280 storage_collections: &dyn StorageCollections<Timestamp = T>,
1281 ) -> Result<Self, StorageError<T>> {
1282 let mut cache = BTreeMap::new();
1283
1284 for id in ids {
1285 let stats = storage_collections.snapshot_stats(*id, as_of.clone()).await;
1286
1287 match stats {
1288 Ok(stats) => {
1289 cache.insert(*id, stats.num_updates);
1290 }
1291 Err(StorageError::IdentifierMissing(id)) => {
1292 ::tracing::debug!("no statistics for {id}")
1293 }
1294 Err(e) => return Err(e),
1295 }
1296 }
1297
1298 Ok(Self { cache })
1299 }
1300}
1301
1302impl StatisticsOracle for CachedStatisticsOracle {
1303 fn cardinality_estimate(&self, id: GlobalId) -> Option<usize> {
1304 self.cache.get(&id).map(|estimate| *estimate)
1305 }
1306
1307 fn as_map(&self) -> BTreeMap<GlobalId, usize> {
1308 self.cache.clone()
1309 }
1310}