1#![allow(clippy::pub_use)]
12
13use std::collections::{BTreeMap, BTreeSet};
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Duration;
19
20use futures::FutureExt;
21use futures::future::LocalBoxFuture;
22use futures::stream::FuturesOrdered;
23use http::Uri;
24use inner::return_if_err;
25use maplit::btreemap;
26use mz_catalog::memory::objects::Cluster;
27use mz_controller_types::ReplicaId;
28use mz_expr::row::RowCollection;
29use mz_expr::{MapFilterProject, MirRelationExpr, ResultSpec, RowSetFinishing};
30use mz_ore::cast::CastFrom;
31use mz_ore::tracing::OpenTelemetryContext;
32use mz_persist_client::stats::SnapshotPartStats;
33use mz_repr::explain::{ExprHumanizerExt, TransientItem};
34use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, Row, RowArena, Timestamp};
35use mz_sql::catalog::{CatalogError, SessionCatalog};
36use mz_sql::names::ResolvedIds;
37use mz_sql::plan::{
38 self, AbortTransactionPlan, CommitTransactionPlan, CopyFromSource, CreateRolePlan,
39 CreateSourcePlanBundle, FetchPlan, HirScalarExpr, MutationKind, Params, Plan, PlanKind,
40 RaisePlan,
41};
42use mz_sql::rbac;
43use mz_sql::session::metadata::SessionMetadata;
44use mz_sql::session::vars;
45use mz_sql::session::vars::SessionVars;
46use mz_sql_parser::ast::{Raw, Statement};
47use mz_storage_client::client::TableData;
48use mz_storage_client::storage_collections::StorageCollections;
49use mz_storage_types::connections::inline::IntoInlineConnection;
50use mz_storage_types::controller::StorageError;
51use mz_storage_types::stats::RelationPartStats;
52use mz_transform::dataflow::DataflowMetainfo;
53use mz_transform::notice::{OptimizerNoticeApi, OptimizerNoticeKind, RawOptimizerNotice};
54use mz_transform::{EmptyStatisticsOracle, StatisticsOracle};
55use timely::progress::Antichain;
56use tokio::sync::oneshot;
57use tracing::{Instrument, Level, Span, event, warn};
58
59use crate::ExecuteContext;
60use crate::catalog::{Catalog, CatalogState};
61use crate::command::{Command, ExecuteResponse, Response};
62use crate::coord::appends::{DeferredOp, DeferredPlan};
63use crate::coord::validity::PlanValidity;
64use crate::coord::{
65 Coordinator, DeferredPlanStatement, ExplainPlanContext, Message, PlanStatement, TargetCluster,
66 catalog_serving,
67};
68use crate::error::AdapterError;
69use crate::explain::insights::PlanInsightsContext;
70use crate::notice::AdapterNotice;
71use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
72use crate::optimize::peek;
73use crate::session::{
74 EndTransactionAction, Session, StateRevision, TransactionOps, TransactionStatus, WriteOp,
75};
76use crate::util::ClientTransmitter;
77
78mod inner;
98
99impl Coordinator {
100 pub(crate) fn sequence_plan(
104 &mut self,
105 mut ctx: ExecuteContext,
106 plan: Plan,
107 resolved_ids: ResolvedIds,
108 ) -> LocalBoxFuture<'_, ()> {
109 async move {
110 let responses = ExecuteResponse::generated_from(&PlanKind::from(&plan));
111 ctx.tx_mut().set_allowed(responses);
112
113 if self.controller.read_only() && !plan.allowed_in_read_only() {
114 ctx.retire(Err(AdapterError::ReadOnly));
115 return;
116 }
117
118 if let Some((dependencies, wait_future)) =
121 super::appends::waiting_on_startup_appends(self.catalog(), ctx.session_mut(), &plan)
122 {
123 let conn_id = ctx.session().conn_id();
124 tracing::debug!(%conn_id, "deferring plan for startup appends");
125
126 let role_metadata = ctx.session().role_metadata().clone();
127 let validity = PlanValidity::new(
128 self.catalog.transient_revision(),
129 dependencies,
130 None,
131 None,
132 role_metadata,
133 );
134 let deferred_plan = DeferredPlan {
135 ctx,
136 plan,
137 validity,
138 requires_locks: BTreeSet::default(),
139 };
140 let acquire_future = wait_future.map(|()| None);
143
144 self.defer_op(acquire_future, DeferredOp::Plan(deferred_plan));
145
146 return;
149 };
150
151 let target_cluster = match ctx.session().transaction().cluster() {
153 Some(cluster_id) => TargetCluster::Transaction(cluster_id),
155 None => {
157 let session_catalog = self.catalog.for_session(ctx.session());
158 catalog_serving::auto_run_on_catalog_server(
159 &session_catalog,
160 ctx.session(),
161 &plan,
162 )
163 }
164 };
165 let (target_cluster_id, target_cluster_name) = match self
166 .catalog()
167 .resolve_target_cluster(target_cluster, ctx.session())
168 {
169 Ok(cluster) => (Some(cluster.id), Some(cluster.name.clone())),
170 Err(_) => (None, None),
171 };
172
173 if let (Some(cluster_id), Some(statement_id)) =
174 (target_cluster_id, ctx.extra().contents())
175 {
176 self.set_statement_execution_cluster(statement_id, cluster_id);
177 }
178
179 let session_catalog = self.catalog.for_session(ctx.session());
180
181 if let Some(cluster_name) = &target_cluster_name {
182 if let Err(e) = catalog_serving::check_cluster_restrictions(
183 cluster_name,
184 &session_catalog,
185 &plan,
186 ) {
187 return ctx.retire(Err(e));
188 }
189 }
190
191 if let Err(e) = rbac::check_plan(
192 &session_catalog,
193 Some(|id| {
194 self.active_conns()
197 .into_iter()
198 .find(|(conn_id, _)| conn_id.unhandled() == id)
199 .map(|(_, conn_meta)| *conn_meta.authenticated_role_id())
200 }),
201 ctx.session(),
202 &plan,
203 target_cluster_id,
204 &resolved_ids,
205 ) {
206 return ctx.retire(Err(e.into()));
207 }
208
209 match plan {
210 Plan::CreateSource(plan) => {
211 let (item_id, global_id) = return_if_err!(self.allocate_user_id().await, ctx);
212 let result = self
213 .sequence_create_source(
214 &mut ctx,
215 vec![CreateSourcePlanBundle {
216 item_id,
217 global_id,
218 plan,
219 resolved_ids,
220 available_source_references: None,
221 }],
222 )
223 .await;
224 ctx.retire(result);
225 }
226 Plan::CreateSources(plans) => {
227 assert!(
228 resolved_ids.is_empty(),
229 "each plan has separate resolved_ids"
230 );
231 let result = self.sequence_create_source(&mut ctx, plans).await;
232 ctx.retire(result);
233 }
234 Plan::CreateConnection(plan) => {
235 self.sequence_create_connection(ctx, plan, resolved_ids)
236 .await;
237 }
238 Plan::CreateDatabase(plan) => {
239 let result = self.sequence_create_database(ctx.session_mut(), plan).await;
240 ctx.retire(result);
241 }
242 Plan::CreateSchema(plan) => {
243 let result = self.sequence_create_schema(ctx.session_mut(), plan).await;
244 ctx.retire(result);
245 }
246 Plan::CreateRole(plan) => {
247 let result = self
248 .sequence_create_role(Some(ctx.session().conn_id()), plan)
249 .await;
250 if let Some(notice) = self.should_emit_rbac_notice(ctx.session()) {
251 ctx.session().add_notice(notice);
252 }
253 ctx.retire(result);
254 }
255 Plan::CreateCluster(plan) => {
256 let result = self.sequence_create_cluster(ctx.session(), plan).await;
257 ctx.retire(result);
258 }
259 Plan::CreateClusterReplica(plan) => {
260 let result = self
261 .sequence_create_cluster_replica(ctx.session(), plan)
262 .await;
263 ctx.retire(result);
264 }
265 Plan::CreateTable(plan) => {
266 let result = self
267 .sequence_create_table(&mut ctx, plan, resolved_ids)
268 .await;
269 ctx.retire(result);
270 }
271 Plan::CreateSecret(plan) => {
272 self.sequence_create_secret(ctx, plan).await;
273 }
274 Plan::CreateSink(plan) => {
275 self.sequence_create_sink(ctx, plan, resolved_ids).await;
276 }
277 Plan::CreateView(plan) => {
278 self.sequence_create_view(ctx, plan, resolved_ids).await;
279 }
280 Plan::CreateMaterializedView(plan) => {
281 self.sequence_create_materialized_view(ctx, plan, resolved_ids)
282 .await;
283 }
284 Plan::CreateIndex(plan) => {
285 self.sequence_create_index(ctx, plan, resolved_ids).await;
286 }
287 Plan::CreateType(plan) => {
288 let result = self
289 .sequence_create_type(ctx.session(), plan, resolved_ids)
290 .await;
291 ctx.retire(result);
292 }
293 Plan::CreateNetworkPolicy(plan) => {
294 let res = self
295 .sequence_create_network_policy(ctx.session(), plan)
296 .await;
297 ctx.retire(res);
298 }
299 Plan::Comment(plan) => {
300 let result = self.sequence_comment_on(ctx.session(), plan).await;
301 ctx.retire(result);
302 }
303 Plan::CopyTo(plan) => {
304 self.sequence_copy_to(ctx, plan, target_cluster).await;
305 }
306 Plan::DropObjects(plan) => {
307 let result = self.sequence_drop_objects(&mut ctx, plan).await;
308 ctx.retire(result);
309 }
310 Plan::DropOwned(plan) => {
311 let result = self.sequence_drop_owned(ctx.session_mut(), plan).await;
312 ctx.retire(result);
313 }
314 Plan::EmptyQuery => {
315 ctx.retire(Ok(ExecuteResponse::EmptyQuery));
316 }
317 Plan::ShowAllVariables => {
318 let result = self.sequence_show_all_variables(ctx.session());
319 ctx.retire(result);
320 }
321 Plan::ShowVariable(plan) => {
322 let result = self.sequence_show_variable(ctx.session(), plan);
323 ctx.retire(result);
324 }
325 Plan::InspectShard(plan) => {
326 let result = self.sequence_inspect_shard(ctx.session(), plan).await;
328 ctx.retire(result);
329 }
330 Plan::SetVariable(plan) => {
331 let result = self.sequence_set_variable(ctx.session_mut(), plan);
332 ctx.retire(result);
333 }
334 Plan::ResetVariable(plan) => {
335 let result = self.sequence_reset_variable(ctx.session_mut(), plan);
336 ctx.retire(result);
337 }
338 Plan::SetTransaction(plan) => {
339 let result = self.sequence_set_transaction(ctx.session_mut(), plan);
340 ctx.retire(result);
341 }
342 Plan::StartTransaction(plan) => {
343 if matches!(
344 ctx.session().transaction(),
345 TransactionStatus::InTransaction(_)
346 ) {
347 ctx.session()
348 .add_notice(AdapterNotice::ExistingTransactionInProgress);
349 }
350 let result = ctx.session_mut().start_transaction(
351 self.now_datetime(),
352 plan.access,
353 plan.isolation_level,
354 );
355 ctx.retire(result.map(|_| ExecuteResponse::StartedTransaction))
356 }
357 Plan::CommitTransaction(CommitTransactionPlan {
358 ref transaction_type,
359 })
360 | Plan::AbortTransaction(AbortTransactionPlan {
361 ref transaction_type,
362 }) => {
363 if ctx.session().transaction().is_ddl() {
366 if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
367 let prev = self
368 .active_conns
369 .get_mut(ctx.session().conn_id())
370 .expect("connection must exist")
371 .deferred_lock
372 .replace(guard);
373 assert!(
374 prev.is_none(),
375 "connections should have at most one lock guard"
376 );
377 } else {
378 self.serialized_ddl.push_back(DeferredPlanStatement {
379 ctx,
380 ps: PlanStatement::Plan { plan, resolved_ids },
381 });
382 return;
383 }
384 }
385
386 let action = match &plan {
387 Plan::CommitTransaction(_) => EndTransactionAction::Commit,
388 Plan::AbortTransaction(_) => EndTransactionAction::Rollback,
389 _ => unreachable!(),
390 };
391 if ctx.session().transaction().is_implicit() && !transaction_type.is_implicit()
392 {
393 ctx.session().add_notice(
398 AdapterNotice::ExplicitTransactionControlInImplicitTransaction,
399 );
400 }
401 self.sequence_end_transaction(ctx, action).await;
402 }
403 Plan::Select(plan) => {
404 let max = Some(ctx.session().vars().max_query_result_size());
405 self.sequence_peek(ctx, plan, target_cluster, max).await;
406 }
407 Plan::Subscribe(plan) => {
408 self.sequence_subscribe(ctx, plan, target_cluster).await;
409 }
410 Plan::SideEffectingFunc(plan) => {
411 self.sequence_side_effecting_func(ctx, plan).await;
412 }
413 Plan::ShowCreate(plan) => {
414 ctx.retire(Ok(Self::send_immediate_rows(plan.row)));
415 }
416 Plan::ShowColumns(show_columns_plan) => {
417 let max = Some(ctx.session().vars().max_query_result_size());
418 self.sequence_peek(ctx, show_columns_plan.select_plan, target_cluster, max)
419 .await;
420 }
421 Plan::CopyFrom(plan) => match plan.source {
422 CopyFromSource::Stdin => {
423 let (tx, _, session, ctx_extra) = ctx.into_parts();
424 tx.send(
425 Ok(ExecuteResponse::CopyFrom {
426 target_id: plan.target_id,
427 target_name: plan.target_name,
428 columns: plan.columns,
429 params: plan.params,
430 ctx_extra,
431 }),
432 session,
433 );
434 }
435 CopyFromSource::Url(_) | CopyFromSource::AwsS3 { .. } => {
436 self.sequence_copy_from(ctx, plan, target_cluster).await;
437 }
438 },
439 Plan::ExplainPlan(plan) => {
440 self.sequence_explain_plan(ctx, plan, target_cluster).await;
441 }
442 Plan::ExplainPushdown(plan) => {
443 self.sequence_explain_pushdown(ctx, plan, target_cluster)
444 .await;
445 }
446 Plan::ExplainSinkSchema(plan) => {
447 let result = self.sequence_explain_schema(plan);
448 ctx.retire(result);
449 }
450 Plan::ExplainTimestamp(plan) => {
451 self.sequence_explain_timestamp(ctx, plan, target_cluster)
452 .await;
453 }
454 Plan::Insert(plan) => {
455 self.sequence_insert(ctx, plan).await;
456 }
457 Plan::ReadThenWrite(plan) => {
458 self.sequence_read_then_write(ctx, plan).await;
459 }
460 Plan::AlterNoop(plan) => {
461 ctx.retire(Ok(ExecuteResponse::AlteredObject(plan.object_type)));
462 }
463 Plan::AlterCluster(plan) => {
464 self.sequence_alter_cluster_staged(ctx, plan).await;
465 }
466 Plan::AlterClusterRename(plan) => {
467 let result = self.sequence_alter_cluster_rename(&mut ctx, plan).await;
468 ctx.retire(result);
469 }
470 Plan::AlterClusterSwap(plan) => {
471 let result = self.sequence_alter_cluster_swap(&mut ctx, plan).await;
472 ctx.retire(result);
473 }
474 Plan::AlterClusterReplicaRename(plan) => {
475 let result = self
476 .sequence_alter_cluster_replica_rename(ctx.session(), plan)
477 .await;
478 ctx.retire(result);
479 }
480 Plan::AlterConnection(plan) => {
481 self.sequence_alter_connection(ctx, plan).await;
482 }
483 Plan::AlterSetCluster(plan) => {
484 let result = self.sequence_alter_set_cluster(ctx.session(), plan).await;
485 ctx.retire(result);
486 }
487 Plan::AlterRetainHistory(plan) => {
488 let result = self.sequence_alter_retain_history(&mut ctx, plan).await;
489 ctx.retire(result);
490 }
491 Plan::AlterSourceTimestampInterval(plan) => {
492 let result = self
493 .sequence_alter_source_timestamp_interval(&mut ctx, plan)
494 .await;
495 ctx.retire(result);
496 }
497 Plan::AlterItemRename(plan) => {
498 let result = self.sequence_alter_item_rename(&mut ctx, plan).await;
499 ctx.retire(result);
500 }
501 Plan::AlterSchemaRename(plan) => {
502 let result = self.sequence_alter_schema_rename(&mut ctx, plan).await;
503 ctx.retire(result);
504 }
505 Plan::AlterSchemaSwap(plan) => {
506 let result = self.sequence_alter_schema_swap(&mut ctx, plan).await;
507 ctx.retire(result);
508 }
509 Plan::AlterRole(plan) => {
510 let result = self.sequence_alter_role(ctx.session_mut(), plan).await;
511 ctx.retire(result);
512 }
513 Plan::AlterSecret(plan) => {
514 self.sequence_alter_secret(ctx, plan).await;
515 }
516 Plan::AlterSink(plan) => {
517 self.sequence_alter_sink_prepare(ctx, plan).await;
518 }
519 Plan::AlterSource(plan) => {
520 let result = self.sequence_alter_source(ctx.session_mut(), plan).await;
521 ctx.retire(result);
522 }
523 Plan::AlterSystemSet(plan) => {
524 let result = self.sequence_alter_system_set(ctx.session(), plan).await;
525 ctx.retire(result);
526 }
527 Plan::AlterSystemReset(plan) => {
528 let result = self.sequence_alter_system_reset(ctx.session(), plan).await;
529 ctx.retire(result);
530 }
531 Plan::AlterSystemResetAll(plan) => {
532 let result = self
533 .sequence_alter_system_reset_all(ctx.session(), plan)
534 .await;
535 ctx.retire(result);
536 }
537 Plan::AlterTableAddColumn(plan) => {
538 let result = self.sequence_alter_table(&mut ctx, plan).await;
539 ctx.retire(result);
540 }
541 Plan::AlterMaterializedViewApplyReplacement(plan) => {
542 self.sequence_alter_materialized_view_apply_replacement_prepare(ctx, plan)
543 .await;
544 }
545 Plan::AlterNetworkPolicy(plan) => {
546 let res = self
547 .sequence_alter_network_policy(ctx.session(), plan)
548 .await;
549 ctx.retire(res);
550 }
551 Plan::DiscardTemp => {
552 self.drop_temp_items(ctx.session().conn_id()).await;
553 ctx.retire(Ok(ExecuteResponse::DiscardedTemp));
554 }
555 Plan::DiscardAll => {
556 let ret = if let TransactionStatus::Started(_) = ctx.session().transaction() {
557 self.clear_transaction(ctx.session_mut()).await;
558 self.drop_temp_items(ctx.session().conn_id()).await;
559 ctx.session_mut().reset();
560 Ok(ExecuteResponse::DiscardedAll)
561 } else {
562 Err(AdapterError::OperationProhibitsTransaction(
563 "DISCARD ALL".into(),
564 ))
565 };
566 ctx.retire(ret);
567 }
568 Plan::Declare(plan) => {
569 self.declare(ctx, plan.name, plan.stmt, plan.sql, plan.params);
570 }
571 Plan::Fetch(FetchPlan {
572 name,
573 count,
574 timeout,
575 }) => {
576 let ctx_extra = std::mem::take(ctx.extra_mut());
577 ctx.retire(Ok(ExecuteResponse::Fetch {
578 name,
579 count,
580 timeout,
581 ctx_extra,
582 }));
583 }
584 Plan::Close(plan) => {
585 if ctx.session_mut().remove_portal(&plan.name) {
586 ctx.retire(Ok(ExecuteResponse::ClosedCursor));
587 } else {
588 ctx.retire(Err(AdapterError::UnknownCursor(plan.name)));
589 }
590 }
591 Plan::Prepare(plan) => {
592 if ctx
593 .session()
594 .get_prepared_statement_unverified(&plan.name)
595 .is_some()
596 {
597 ctx.retire(Err(AdapterError::PreparedStatementExists(plan.name)));
598 } else {
599 let state_revision = StateRevision {
600 catalog_revision: self.catalog().transient_revision(),
601 session_state_revision: ctx.session().state_revision(),
602 };
603 ctx.session_mut().set_prepared_statement(
604 plan.name,
605 Some(plan.stmt),
606 plan.sql,
607 plan.desc,
608 state_revision,
609 self.now(),
610 );
611 ctx.retire(Ok(ExecuteResponse::Prepare));
612 }
613 }
614 Plan::Execute(plan) => {
615 match self.sequence_execute(ctx.session_mut(), plan) {
616 Ok(portal_name) => {
617 let (tx, _, session, extra) = ctx.into_parts();
618 self.internal_cmd_tx
619 .send(Message::Command(
620 OpenTelemetryContext::obtain(),
621 Command::Execute {
622 portal_name,
623 session,
624 tx: tx.take(),
625 outer_ctx_extra: Some(extra),
626 },
627 ))
628 .expect("sending to self.internal_cmd_tx cannot fail");
629 }
630 Err(err) => ctx.retire(Err(err)),
631 };
632 }
633 Plan::Deallocate(plan) => match plan.name {
634 Some(name) => {
635 if ctx.session_mut().remove_prepared_statement(&name) {
636 ctx.retire(Ok(ExecuteResponse::Deallocate { all: false }));
637 } else {
638 ctx.retire(Err(AdapterError::UnknownPreparedStatement(name)));
639 }
640 }
641 None => {
642 ctx.session_mut().remove_all_prepared_statements();
643 ctx.retire(Ok(ExecuteResponse::Deallocate { all: true }));
644 }
645 },
646 Plan::Raise(RaisePlan { severity }) => {
647 ctx.session()
648 .add_notice(AdapterNotice::UserRequested { severity });
649 ctx.retire(Ok(ExecuteResponse::Raised));
650 }
651 Plan::GrantPrivileges(plan) => {
652 let result = self
653 .sequence_grant_privileges(ctx.session_mut(), plan)
654 .await;
655 ctx.retire(result);
656 }
657 Plan::RevokePrivileges(plan) => {
658 let result = self
659 .sequence_revoke_privileges(ctx.session_mut(), plan)
660 .await;
661 ctx.retire(result);
662 }
663 Plan::AlterDefaultPrivileges(plan) => {
664 let result = self
665 .sequence_alter_default_privileges(ctx.session_mut(), plan)
666 .await;
667 ctx.retire(result);
668 }
669 Plan::GrantRole(plan) => {
670 let result = self.sequence_grant_role(ctx.session_mut(), plan).await;
671 ctx.retire(result);
672 }
673 Plan::RevokeRole(plan) => {
674 let result = self.sequence_revoke_role(ctx.session_mut(), plan).await;
675 ctx.retire(result);
676 }
677 Plan::AlterOwner(plan) => {
678 let result = self.sequence_alter_owner(ctx.session_mut(), plan).await;
679 ctx.retire(result);
680 }
681 Plan::ReassignOwned(plan) => {
682 let result = self.sequence_reassign_owned(ctx.session_mut(), plan).await;
683 ctx.retire(result);
684 }
685 Plan::ValidateConnection(plan) => {
686 let connection = plan
687 .connection
688 .into_inline_connection(self.catalog().state());
689 let current_storage_configuration = self.controller.storage.config().clone();
690 mz_ore::task::spawn(|| "coord::validate_connection", async move {
691 let res = match connection
692 .validate(plan.id, ¤t_storage_configuration)
693 .await
694 {
695 Ok(()) => Ok(ExecuteResponse::ValidatedConnection),
696 Err(err) => Err(err.into()),
697 };
698 ctx.retire(res);
699 });
700 }
701 }
702 }
703 .instrument(tracing::debug_span!("coord::sequencer::sequence_plan"))
704 .boxed_local()
705 }
706
707 #[mz_ore::instrument(level = "debug")]
708 pub(crate) async fn sequence_execute_single_statement_transaction(
709 &mut self,
710 ctx: ExecuteContext,
711 stmt: Arc<Statement<Raw>>,
712 params: Params,
713 ) {
714 let (tx, internal_cmd_tx, mut session, extra) = ctx.into_parts();
716 assert!(matches!(session.transaction(), TransactionStatus::Default));
717 session.start_transaction_single_stmt(self.now_datetime());
718 let conn_id = session.conn_id().unhandled();
719
720 let (sub_tx, sub_rx) = oneshot::channel();
722 let sub_ct = ClientTransmitter::new(sub_tx, self.internal_cmd_tx.clone());
723 let sub_ctx = ExecuteContext::from_parts(sub_ct, internal_cmd_tx, session, extra);
724 self.handle_execute_inner(stmt, params, sub_ctx).await;
725
726 let internal_cmd_tx = self.internal_cmd_tx.clone();
729 mz_ore::task::spawn(
730 || format!("execute_single_statement:{conn_id}"),
731 async move {
732 let Ok(Response {
733 result,
734 session,
735 otel_ctx,
736 }) = sub_rx.await
737 else {
738 return;
740 };
741 otel_ctx.attach_as_parent();
742 let (sub_tx, sub_rx) = oneshot::channel();
743 let _ = internal_cmd_tx.send(Message::Command(
744 otel_ctx,
745 Command::Commit {
746 action: EndTransactionAction::Commit,
747 session,
748 tx: sub_tx,
749 },
750 ));
751 let Ok(commit_response) = sub_rx.await else {
752 return;
754 };
755 assert!(matches!(
756 commit_response.session.transaction(),
757 TransactionStatus::Default
758 ));
759 let result = match (result, commit_response.result) {
765 (Ok(_), commit) => commit,
766 (Err(result), _) => Err(result),
767 };
768 tx.send(result, commit_response.session);
771 }
772 .instrument(Span::current()),
773 );
774 }
775
776 #[mz_ore::instrument(level = "debug")]
780 pub(crate) async fn sequence_create_role_for_startup(
781 &mut self,
782 plan: CreateRolePlan,
783 ) -> Result<ExecuteResponse, AdapterError> {
784 self.sequence_create_role(None, plan).await
791 }
792
793 pub(crate) fn allocate_transient_id(&self) -> (CatalogItemId, GlobalId) {
794 self.transient_id_gen.allocate_id()
795 }
796
797 fn should_emit_rbac_notice(&self, session: &Session) -> Option<AdapterNotice> {
798 if !rbac::is_rbac_enabled_for_session(self.catalog.system_config(), session) {
799 Some(AdapterNotice::RbacUserDisabled)
800 } else {
801 None
802 }
803 }
804
805 pub(crate) fn insert_constant(
812 catalog: &Catalog,
813 session: &mut Session,
814 target_id: CatalogItemId,
815 constants: MirRelationExpr,
816 ) -> Result<ExecuteResponse, AdapterError> {
817 let desc = match catalog.try_get_entry(&target_id) {
819 Some(table) => {
820 table.relation_desc_latest().expect("table has desc")
822 }
823 None => {
824 return Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
825 kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
826 target_id.to_string(),
827 )),
828 }));
829 }
830 };
831
832 match constants.as_const() {
833 Some((rows, ..)) => {
834 let rows = rows.clone()?;
835 for (row, _) in &rows {
836 for (i, datum) in row.iter().enumerate() {
837 desc.constraints_met(i, &datum)?;
838 }
839 }
840 let diffs_plan = plan::SendDiffsPlan {
841 id: target_id,
842 updates: rows,
843 kind: MutationKind::Insert,
844 returning: Vec::new(),
845 max_result_size: catalog.system_config().max_result_size(),
846 };
847 Self::send_diffs(session, diffs_plan)
848 }
849 None => panic!(
850 "tried using sequence_insert_constant on non-constant MirRelationExpr\n{}",
851 constants.pretty(),
852 ),
853 }
854 }
855
856 #[mz_ore::instrument(level = "debug")]
857 pub(crate) fn send_diffs(
858 session: &mut Session,
859 mut plan: plan::SendDiffsPlan,
860 ) -> Result<ExecuteResponse, AdapterError> {
861 let affected_rows = {
862 let mut affected_rows = Diff::from(0);
863 let mut all_positive_diffs = true;
864 for (_, diff) in plan.updates.iter() {
867 if diff.is_negative() {
868 all_positive_diffs = false;
869 break;
870 }
871
872 affected_rows += diff;
873 }
874
875 if !all_positive_diffs {
876 differential_dataflow::consolidation::consolidate(&mut plan.updates);
880
881 affected_rows = Diff::ZERO;
882 for (_, diff) in plan.updates.iter() {
887 affected_rows += diff.abs();
888 }
889 }
890
891 usize::try_from(affected_rows.into_inner()).expect("positive Diff must fit")
892 };
893 event!(
894 Level::TRACE,
895 affected_rows,
896 id = format!("{:?}", plan.id),
897 kind = format!("{:?}", plan.kind),
898 updates = plan.updates.len(),
899 returning = plan.returning.len(),
900 );
901
902 session.add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
903 id: plan.id,
904 rows: TableData::Rows(plan.updates),
905 }]))?;
906 if !plan.returning.is_empty() {
907 let finishing = RowSetFinishing {
908 order_by: Vec::new(),
909 limit: None,
910 offset: 0,
911 project: (0..plan.returning[0].0.iter().count()).collect(),
912 };
913 let max_returned_query_size = session.vars().max_query_result_size();
914 let duration_histogram = session.metrics().row_set_finishing_seconds();
915
916 return match finishing.finish(
917 RowCollection::new(plan.returning, &finishing.order_by),
918 plan.max_result_size,
919 Some(max_returned_query_size),
920 duration_histogram,
921 ) {
922 Ok((rows, _size_bytes)) => Ok(Self::send_immediate_rows(rows)),
923 Err(e) => Err(AdapterError::ResultSize(e)),
924 };
925 }
926 Ok(match plan.kind {
927 MutationKind::Delete => ExecuteResponse::Deleted(affected_rows),
928 MutationKind::Insert => ExecuteResponse::Inserted(affected_rows),
929 MutationKind::Update => ExecuteResponse::Updated(affected_rows / 2),
930 })
931 }
932}
933
934pub(crate) fn check_log_reads(
943 catalog: &Catalog,
944 cluster: &Cluster,
945 source_ids: &BTreeSet<GlobalId>,
946 target_replica: &mut Option<ReplicaId>,
947 vars: &SessionVars,
948) -> Result<impl IntoIterator<Item = AdapterNotice>, AdapterError>
949where
950{
951 let log_names = source_ids
952 .iter()
953 .map(|gid| catalog.resolve_item_id(gid))
954 .flat_map(|item_id| catalog.introspection_dependencies(item_id))
955 .map(|item_id| catalog.get_entry(&item_id).name().item.clone())
956 .collect::<Vec<_>>();
957
958 if log_names.is_empty() {
959 return Ok(None);
960 }
961
962 let num_replicas = cluster.replicas().count();
966 if target_replica.is_none() {
967 if num_replicas == 1 {
968 *target_replica = cluster.replicas().map(|r| r.replica_id).next();
969 } else {
970 return Err(AdapterError::UntargetedLogRead { log_names });
971 }
972 }
973
974 let replica_id = target_replica.expect("set to `Some` above");
977 let replica = &cluster.replica(replica_id).expect("Replica must exist");
978 if !replica.config.compute.logging.enabled() {
979 return Err(AdapterError::IntrospectionDisabled { log_names });
980 }
981
982 Ok(vars
983 .emit_introspection_query_notice()
984 .then_some(AdapterNotice::PerReplicaLogRead { log_names }))
985}
986
987pub(crate) fn emit_optimizer_notices(
989 catalog: &Catalog,
990 session: &Session,
991 notices: &[RawOptimizerNotice],
992) {
993 if notices.is_empty() {
995 return;
996 }
997 let humanizer = catalog.for_session(session);
998 let system_vars = catalog.system_config();
999 for notice in notices {
1000 let kind = OptimizerNoticeKind::from(notice);
1001 let notice_enabled = match kind {
1002 OptimizerNoticeKind::EqualsNull => system_vars.enable_notices_for_equals_null(),
1003 OptimizerNoticeKind::IndexAlreadyExists => {
1004 system_vars.enable_notices_for_index_already_exists()
1005 }
1006 OptimizerNoticeKind::IndexTooWideForLiteralConstraints => {
1007 system_vars.enable_notices_for_index_too_wide_for_literal_constraints()
1008 }
1009 OptimizerNoticeKind::IndexKeyEmpty => system_vars.enable_notices_for_index_empty_key(),
1010 };
1011 if notice_enabled {
1012 session.add_notice(AdapterNotice::OptimizerNotice {
1016 notice: notice.message(&humanizer, false).to_string(),
1017 hint: notice.hint(&humanizer, false).to_string(),
1018 });
1019 }
1020 session
1021 .metrics()
1022 .optimization_notices(&[kind.metric_label()])
1023 .inc_by(1);
1024 }
1025}
1026
1027pub fn eval_copy_to_uri(
1032 to: HirScalarExpr,
1033 session: &Session,
1034 catalog_state: &CatalogState,
1035) -> Result<Uri, AdapterError> {
1036 let style = ExprPrepOneShot {
1037 logical_time: EvalTime::NotAvailable,
1038 session,
1039 catalog_state,
1040 };
1041 let mut to = to.lower_uncorrelated(catalog_state.system_config())?;
1042 style.prep_scalar_expr(&mut to)?;
1043 let temp_storage = RowArena::new();
1044 let evaled = to.eval(&[], &temp_storage)?;
1045 if evaled == Datum::Null {
1046 coord_bail!("COPY TO target value can not be null");
1047 }
1048 let to_url = match Uri::from_str(evaled.unwrap_str()) {
1049 Ok(url) => {
1050 if url.scheme_str() != Some("s3") && url.scheme_str() != Some("gs") {
1051 coord_bail!("only 's3://...' and 'gs://...' urls are supported as COPY TO target");
1052 }
1053 url
1054 }
1055 Err(e) => coord_bail!("could not parse COPY TO target url: {}", e),
1056 };
1057 Ok(to_url)
1058}
1059
1060pub(crate) async fn explain_pushdown_future_inner<
1067 I: IntoIterator<Item = (GlobalId, MapFilterProject)>,
1068>(
1069 session: &Session,
1070 catalog: &Catalog,
1071 storage_collections: &Arc<dyn StorageCollections + Send + Sync>,
1072 as_of: Antichain<Timestamp>,
1073 mz_now: ResultSpec<'static>,
1074 imports: I,
1075) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
1076 let mut explain_timeout = *session.vars().statement_timeout();
1077 if explain_timeout == Duration::ZERO {
1079 explain_timeout = Duration::MAX;
1080 }
1081 let mut futures = FuturesOrdered::new();
1082 for (id, mfp) in imports {
1083 let catalog_entry = catalog.get_entry_by_global_id(&id);
1084 let full_name = catalog
1085 .for_session(session)
1086 .resolve_full_name(&catalog_entry.name);
1087 let name = format!("{}", full_name);
1088 let relation_desc = catalog_entry
1089 .relation_desc()
1090 .expect("source should have a proper desc")
1091 .into_owned();
1092 let stats_future = storage_collections
1093 .snapshot_parts_stats(id, as_of.clone())
1094 .await;
1095
1096 let mz_now = mz_now.clone();
1097 futures.push_back(async move {
1102 let snapshot_stats = match stats_future.await {
1103 Ok(stats) => stats,
1104 Err(e) => return Err(e),
1105 };
1106 let mut total_bytes = 0;
1107 let mut total_parts = 0;
1108 let mut selected_bytes = 0;
1109 let mut selected_parts = 0;
1110 for SnapshotPartStats {
1111 encoded_size_bytes: bytes,
1112 stats,
1113 } in &snapshot_stats.parts
1114 {
1115 let bytes = u64::cast_from(*bytes);
1116 total_bytes += bytes;
1117 total_parts += 1u64;
1118 let selected = match stats {
1119 None => true,
1120 Some(stats) => {
1121 let stats = stats.decode();
1122 let stats = RelationPartStats::new(
1123 name.as_str(),
1124 &snapshot_stats.metrics.pushdown.part_stats,
1125 &relation_desc,
1126 &stats,
1127 );
1128 stats.may_match_mfp(mz_now.clone(), &mfp)
1129 }
1130 };
1131
1132 if selected {
1133 selected_bytes += bytes;
1134 selected_parts += 1u64;
1135 }
1136 }
1137 Ok(Row::pack_slice(&[
1138 name.as_str().into(),
1139 total_bytes.into(),
1140 selected_bytes.into(),
1141 total_parts.into(),
1142 selected_parts.into(),
1143 ]))
1144 });
1145 }
1146
1147 let fut = async move {
1148 match tokio::time::timeout(
1149 explain_timeout,
1150 futures::TryStreamExt::try_collect::<Vec<_>>(futures),
1151 )
1152 .await
1153 {
1154 Ok(Ok(rows)) => Ok(ExecuteResponse::SendingRowsImmediate {
1155 rows: Box::new(rows.into_row_iter()),
1156 }),
1157 Ok(Err(err)) => Err(err.into()),
1158 Err(_) => Err(AdapterError::StatementTimeout),
1159 }
1160 };
1161 fut
1162}
1163
1164pub(crate) async fn explain_plan_inner(
1167 session: &Session,
1168 catalog: &Catalog,
1169 df_meta: DataflowMetainfo,
1170 explain_ctx: ExplainPlanContext,
1171 optimizer: peek::Optimizer,
1172 insights_ctx: Option<Box<PlanInsightsContext>>,
1173) -> Result<Vec<Row>, AdapterError> {
1174 let ExplainPlanContext {
1175 config,
1176 format,
1177 stage,
1178 desc,
1179 optimizer_trace,
1180 ..
1181 } = explain_ctx;
1182
1183 let desc = desc.expect("RelationDesc for SelectPlan in EXPLAIN mode");
1184
1185 let session_catalog = catalog.for_session(session);
1186 let expr_humanizer = {
1187 let transient_items = btreemap! {
1188 optimizer.select_id() => TransientItem::new(
1189 Some(vec![GlobalId::Explain.to_string()]),
1190 Some(desc.iter_names().map(|c| c.to_string()).collect()),
1191 )
1192 };
1193 ExprHumanizerExt::new(transient_items, &session_catalog)
1194 };
1195
1196 let finishing = if optimizer.finishing().is_trivial(desc.arity()) {
1197 None
1198 } else {
1199 Some(optimizer.finishing().clone())
1200 };
1201
1202 let target_cluster = catalog.get_cluster(optimizer.cluster_id());
1203 let features = optimizer.config().features.clone();
1204
1205 let rows = optimizer_trace
1206 .into_rows(
1207 format,
1208 &config,
1209 &features,
1210 &expr_humanizer,
1211 finishing,
1212 Some(target_cluster),
1213 df_meta,
1214 stage,
1215 plan::ExplaineeStatementKind::Select,
1216 insights_ctx,
1217 )
1218 .await?;
1219
1220 Ok(rows)
1221}
1222
1223pub(crate) async fn statistics_oracle(
1228 session: &Session,
1229 source_ids: &BTreeSet<GlobalId>,
1230 query_as_of: &Antichain<Timestamp>,
1231 is_oneshot: bool,
1232 system_config: &vars::SystemVars,
1233 storage_collections: &dyn StorageCollections,
1234) -> Result<Box<dyn StatisticsOracle>, AdapterError> {
1235 if !session.vars().enable_session_cardinality_estimates() {
1236 let stats: Box<dyn StatisticsOracle> = Box::new(EmptyStatisticsOracle);
1237 return Ok(stats);
1238 }
1239
1240 let timeout = if is_oneshot {
1241 system_config.optimizer_oneshot_stats_timeout()
1243 } else {
1244 system_config.optimizer_stats_timeout()
1245 };
1246
1247 let cached_stats = mz_ore::future::timeout(
1248 timeout,
1249 CachedStatisticsOracle::new(source_ids, query_as_of, storage_collections),
1250 )
1251 .await;
1252
1253 match cached_stats {
1254 Ok(stats) => Ok(Box::new(stats)),
1255 Err(mz_ore::future::TimeoutError::DeadlineElapsed) => {
1256 warn!(
1257 is_oneshot = is_oneshot,
1258 "optimizer statistics collection timed out after {}ms",
1259 timeout.as_millis()
1260 );
1261
1262 Ok(Box::new(EmptyStatisticsOracle))
1263 }
1264 Err(mz_ore::future::TimeoutError::Inner(e)) => Err(AdapterError::Storage(e)),
1265 }
1266}
1267
1268#[derive(Debug)]
1269struct CachedStatisticsOracle {
1270 cache: BTreeMap<GlobalId, usize>,
1271}
1272
1273impl CachedStatisticsOracle {
1274 pub async fn new(
1275 ids: &BTreeSet<GlobalId>,
1276 as_of: &Antichain<Timestamp>,
1277 storage_collections: &dyn StorageCollections,
1278 ) -> Result<Self, StorageError> {
1279 let mut cache = BTreeMap::new();
1280
1281 for id in ids {
1282 let stats = storage_collections.snapshot_stats(*id, as_of.clone()).await;
1283
1284 match stats {
1285 Ok(stats) => {
1286 cache.insert(*id, stats.num_updates);
1287 }
1288 Err(StorageError::IdentifierMissing(id)) => {
1289 ::tracing::debug!("no statistics for {id}")
1290 }
1291 Err(e) => return Err(e),
1292 }
1293 }
1294
1295 Ok(Self { cache })
1296 }
1297}
1298
1299impl StatisticsOracle for CachedStatisticsOracle {
1300 fn cardinality_estimate(&self, id: GlobalId) -> Option<usize> {
1301 self.cache.get(&id).map(|estimate| *estimate)
1302 }
1303
1304 fn as_map(&self) -> BTreeMap<GlobalId, usize> {
1305 self.cache.clone()
1306 }
1307}