1#![allow(clippy::pub_use)]
12
13use std::collections::{BTreeMap, BTreeSet};
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Duration;
19
20use futures::FutureExt;
21use futures::future::LocalBoxFuture;
22use futures::stream::FuturesOrdered;
23use http::Uri;
24use inner::return_if_err;
25use maplit::btreemap;
26use mz_catalog::memory::objects::Cluster;
27use mz_controller_types::ReplicaId;
28use mz_expr::row::RowCollection;
29use mz_expr::{Eval, MapFilterProject, MirRelationExpr, ResultSpec, RowSetFinishing};
30use mz_ore::cast::CastFrom;
31use mz_ore::tracing::OpenTelemetryContext;
32use mz_persist_client::stats::SnapshotPartStats;
33use mz_repr::explain::{ExprHumanizerExt, TransientItem};
34use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, Row, RowArena, Timestamp};
35use mz_sql::catalog::{CatalogError, SessionCatalog};
36use mz_sql::names::ResolvedIds;
37use mz_sql::plan::{
38 self, AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, CreateSourcePlanBundle,
39 FetchPlan, HirScalarExpr, MutationKind, Params, Plan, PlanKind, RaisePlan,
40};
41use mz_sql::rbac;
42use mz_sql::session::metadata::SessionMetadata;
43use mz_sql::session::vars;
44use mz_sql::session::vars::SessionVars;
45use mz_sql_parser::ast::{Raw, Statement};
46use mz_storage_client::client::TableData;
47use mz_storage_client::storage_collections::StorageCollections;
48use mz_storage_types::connections::inline::IntoInlineConnection;
49use mz_storage_types::controller::StorageError;
50use mz_storage_types::stats::RelationPartStats;
51use mz_transform::dataflow::DataflowMetainfo;
52use mz_transform::notice::{OptimizerNoticeApi, OptimizerNoticeKind, RawOptimizerNotice};
53use mz_transform::{EmptyStatisticsOracle, StatisticsOracle};
54use timely::progress::Antichain;
55use tokio::sync::oneshot;
56use tracing::{Instrument, Level, Span, event, warn};
57
58use crate::ExecuteContext;
59use crate::catalog::{Catalog, CatalogState};
60use crate::command::{Command, ExecuteResponse, Response};
61use crate::coord::appends::{DeferredOp, DeferredPlan};
62use crate::coord::validity::PlanValidity;
63use crate::coord::{
64 Coordinator, DeferredPlanStatement, ExplainPlanContext, Message, PlanStatement, TargetCluster,
65 catalog_serving,
66};
67use crate::error::AdapterError;
68use crate::explain::insights::PlanInsightsContext;
69use crate::notice::AdapterNotice;
70use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
71use crate::optimize::peek;
72use crate::session::{
73 EndTransactionAction, Session, StateRevision, TransactionOps, TransactionStatus, WriteOp,
74};
75use crate::util::ClientTransmitter;
76
77mod inner;
97
98impl Coordinator {
99 pub(crate) fn sequence_plan(
103 &mut self,
104 mut ctx: ExecuteContext,
105 plan: Plan,
106 resolved_ids: ResolvedIds,
107 sql_impl_resolved_ids: ResolvedIds,
108 ) -> LocalBoxFuture<'_, ()> {
109 async move {
110 let responses = ExecuteResponse::generated_from(&PlanKind::from(&plan));
111 ctx.tx_mut().set_allowed(responses);
112
113 if self.controller.read_only() && !plan.allowed_in_read_only() {
114 ctx.retire(Err(AdapterError::ReadOnly));
115 return;
116 }
117
118 if let Some((dependencies, wait_future)) =
121 super::appends::waiting_on_startup_appends(self.catalog(), ctx.session_mut(), &plan)
122 {
123 let conn_id = ctx.session().conn_id();
124 tracing::debug!(%conn_id, "deferring plan for startup appends");
125
126 let role_metadata = ctx.session().role_metadata().clone();
127 let validity = PlanValidity::new(
128 self.catalog.transient_revision(),
129 dependencies,
130 None,
131 None,
132 role_metadata,
133 );
134 let deferred_plan = DeferredPlan {
135 ctx,
136 plan,
137 validity,
138 requires_locks: BTreeSet::default(),
139 resolved_ids,
140 sql_impl_resolved_ids,
141 };
142 let acquire_future = wait_future.map(|()| None);
145
146 self.defer_op(acquire_future, DeferredOp::Plan(deferred_plan));
147
148 return;
151 };
152
153 let target_cluster = match ctx.session().transaction().cluster() {
155 Some(cluster_id) => TargetCluster::Transaction(cluster_id),
157 None => {
159 let session_catalog = self.catalog.for_session(ctx.session());
160 catalog_serving::auto_run_on_catalog_server(
161 &session_catalog,
162 ctx.session(),
163 &plan,
164 )
165 }
166 };
167 let (target_cluster_id, target_cluster_name) = match self
168 .catalog()
169 .resolve_target_cluster(target_cluster, ctx.session())
170 {
171 Ok(cluster) => (Some(cluster.id), Some(cluster.name.clone())),
172 Err(_) => (None, None),
173 };
174
175 if let (Some(cluster_id), Some(cluster_name), Some(statement_id)) = (
176 target_cluster_id,
177 target_cluster_name.clone(),
178 ctx.extra().contents(),
179 ) {
180 self.set_statement_execution_cluster(statement_id, cluster_id, cluster_name);
181 }
182
183 let session_catalog = self.catalog.for_session(ctx.session());
184
185 if let Some(cluster_name) = &target_cluster_name {
186 if let Err(e) = catalog_serving::check_cluster_restrictions(
187 cluster_name,
188 &session_catalog,
189 &plan,
190 ) {
191 return ctx.retire(Err(e));
192 }
193 }
194
195 if let Err(e) = rbac::check_plan(
196 &session_catalog,
197 Some(|id| {
198 self.active_conns()
201 .into_iter()
202 .find(|(conn_id, _)| conn_id.unhandled() == id)
203 .map(|(_, conn_meta)| *conn_meta.authenticated_role_id())
204 }),
205 ctx.session(),
206 &plan,
207 target_cluster_id,
208 &resolved_ids,
209 &sql_impl_resolved_ids,
210 ) {
211 return ctx.retire(Err(e.into()));
212 }
213
214 match plan {
215 Plan::CreateSource(plan) => {
216 let (item_id, global_id) = return_if_err!(self.allocate_user_id().await, ctx);
217 let result = self
218 .sequence_create_source(
219 &mut ctx,
220 vec![CreateSourcePlanBundle {
221 item_id,
222 global_id,
223 plan,
224 resolved_ids,
225 available_source_references: None,
226 }],
227 )
228 .await;
229 ctx.retire(result);
230 }
231 Plan::CreateSources(plans) => {
232 assert!(
233 resolved_ids.is_empty(),
234 "each plan has separate resolved_ids"
235 );
236 let result = self.sequence_create_source(&mut ctx, plans).await;
237 ctx.retire(result);
238 }
239 Plan::CreateConnection(plan) => {
240 self.sequence_create_connection(ctx, plan, resolved_ids)
241 .await;
242 }
243 Plan::CreateDatabase(plan) => {
244 let result = self.sequence_create_database(ctx.session_mut(), plan).await;
245 ctx.retire(result);
246 }
247 Plan::CreateSchema(plan) => {
248 let result = self.sequence_create_schema(ctx.session_mut(), plan).await;
249 ctx.retire(result);
250 }
251 Plan::CreateRole(plan) => {
252 let result = self
253 .sequence_create_role(Some(ctx.session().conn_id()), plan)
254 .await;
255 if let Some(notice) = self.should_emit_rbac_notice(ctx.session()) {
256 ctx.session().add_notice(notice);
257 }
258 ctx.retire(result);
259 }
260 Plan::CreateCluster(plan) => {
261 let result = self.sequence_create_cluster(ctx.session(), plan).await;
262 ctx.retire(result);
263 }
264 Plan::CreateClusterReplica(plan) => {
265 let result = self
266 .sequence_create_cluster_replica(ctx.session(), plan)
267 .await;
268 ctx.retire(result);
269 }
270 Plan::CreateTable(plan) => {
271 let result = self
272 .sequence_create_table(&mut ctx, plan, resolved_ids)
273 .await;
274 ctx.retire(result);
275 }
276 Plan::CreateSecret(plan) => {
277 self.sequence_create_secret(ctx, plan).await;
278 }
279 Plan::CreateSink(plan) => {
280 self.sequence_create_sink(ctx, plan, resolved_ids).await;
281 }
282 Plan::CreateView(plan) => {
283 self.sequence_create_view(ctx, plan, resolved_ids).await;
284 }
285 Plan::CreateMaterializedView(plan) => {
286 self.sequence_create_materialized_view(ctx, plan, resolved_ids)
287 .await;
288 }
289 Plan::CreateIndex(plan) => {
290 self.sequence_create_index(ctx, plan, resolved_ids).await;
291 }
292 Plan::CreateType(plan) => {
293 let result = self
294 .sequence_create_type(ctx.session(), plan, resolved_ids)
295 .await;
296 ctx.retire(result);
297 }
298 Plan::CreateNetworkPolicy(plan) => {
299 let res = self
300 .sequence_create_network_policy(ctx.session(), plan)
301 .await;
302 ctx.retire(res);
303 }
304 Plan::Comment(plan) => {
305 let result = self.sequence_comment_on(ctx.session(), plan).await;
306 ctx.retire(result);
307 }
308 Plan::CopyTo(plan) => {
309 self.sequence_copy_to(ctx, plan, target_cluster).await;
310 }
311 Plan::DropObjects(plan) => {
312 let result = self.sequence_drop_objects(&mut ctx, plan).await;
313 ctx.retire(result);
314 }
315 Plan::DropOwned(plan) => {
316 let result = self.sequence_drop_owned(ctx.session_mut(), plan).await;
317 ctx.retire(result);
318 }
319 Plan::EmptyQuery => {
320 ctx.retire(Ok(ExecuteResponse::EmptyQuery));
321 }
322 Plan::ShowAllVariables => {
323 let result = self.sequence_show_all_variables(ctx.session());
324 ctx.retire(result);
325 }
326 Plan::ShowVariable(plan) => {
327 let result = self.sequence_show_variable(ctx.session(), plan);
328 ctx.retire(result);
329 }
330 Plan::InspectShard(plan) => {
331 let result = self.sequence_inspect_shard(ctx.session(), plan).await;
333 ctx.retire(result);
334 }
335 Plan::SetVariable(plan) => {
336 let result = self.sequence_set_variable(ctx.session_mut(), plan);
337 ctx.retire(result);
338 }
339 Plan::ResetVariable(plan) => {
340 let result = self.sequence_reset_variable(ctx.session_mut(), plan);
341 ctx.retire(result);
342 }
343 Plan::SetTransaction(plan) => {
344 let result = self.sequence_set_transaction(ctx.session_mut(), plan);
345 ctx.retire(result);
346 }
347 Plan::StartTransaction(plan) => {
348 if matches!(
349 ctx.session().transaction(),
350 TransactionStatus::InTransaction(_)
351 ) {
352 ctx.session()
353 .add_notice(AdapterNotice::ExistingTransactionInProgress);
354 }
355 let result = ctx.session_mut().start_transaction(
356 self.now_datetime(),
357 plan.access,
358 plan.isolation_level,
359 );
360 ctx.retire(result.map(|_| ExecuteResponse::StartedTransaction))
361 }
362 Plan::CommitTransaction(CommitTransactionPlan {
363 ref transaction_type,
364 })
365 | Plan::AbortTransaction(AbortTransactionPlan {
366 ref transaction_type,
367 }) => {
368 if ctx.session().transaction().is_ddl() {
371 if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
372 let prev = self
373 .active_conns
374 .get_mut(ctx.session().conn_id())
375 .expect("connection must exist")
376 .deferred_lock
377 .replace(guard);
378 assert!(
379 prev.is_none(),
380 "connections should have at most one lock guard"
381 );
382 } else {
383 self.serialized_ddl.push_back(DeferredPlanStatement {
384 ctx,
385 ps: PlanStatement::Plan {
386 plan,
387 resolved_ids,
388 sql_impl_resolved_ids,
389 },
390 });
391 return;
392 }
393 }
394
395 let action = match &plan {
396 Plan::CommitTransaction(_) => EndTransactionAction::Commit,
397 Plan::AbortTransaction(_) => EndTransactionAction::Rollback,
398 _ => unreachable!(),
399 };
400 if ctx.session().transaction().is_implicit() && !transaction_type.is_implicit()
401 {
402 ctx.session().add_notice(
407 AdapterNotice::ExplicitTransactionControlInImplicitTransaction,
408 );
409 }
410 self.sequence_end_transaction(ctx, action).await;
411 }
412 Plan::Select(plan) => {
413 let max = Some(ctx.session().vars().max_query_result_size());
414 self.sequence_peek(ctx, plan, target_cluster, max).await;
415 }
416 Plan::Subscribe(plan) => {
417 self.sequence_subscribe(ctx, plan, target_cluster).await;
418 }
419 Plan::SideEffectingFunc(plan) => {
420 self.sequence_side_effecting_func(ctx, plan).await;
421 }
422 Plan::ShowCreate(plan) => {
423 ctx.retire(Ok(Self::send_immediate_rows(plan.row)));
424 }
425 Plan::ShowColumns(show_columns_plan) => {
426 let max = Some(ctx.session().vars().max_query_result_size());
427 self.sequence_peek(ctx, show_columns_plan.select_plan, target_cluster, max)
428 .await;
429 }
430 Plan::CopyFrom(plan) => {
431 self.sequence_copy_from(ctx, plan, target_cluster).await;
432 }
433 Plan::ExplainPlan(plan) => {
434 self.sequence_explain_plan(ctx, plan, target_cluster).await;
435 }
436 Plan::ExplainPushdown(plan) => {
437 self.sequence_explain_pushdown(ctx, plan, target_cluster)
438 .await;
439 }
440 Plan::ExplainSinkSchema(plan) => {
441 let result = self.sequence_explain_schema(plan);
442 ctx.retire(result);
443 }
444 Plan::ExplainTimestamp(plan) => {
445 self.sequence_explain_timestamp(ctx, plan, target_cluster)
446 .await;
447 }
448 Plan::Insert(plan) => {
449 self.sequence_insert(ctx, plan).await;
450 }
451 Plan::ReadThenWrite(plan) => {
452 self.sequence_read_then_write(ctx, plan).await;
453 }
454 Plan::AlterNoop(plan) => {
455 ctx.retire(Ok(ExecuteResponse::AlteredObject(plan.object_type)));
456 }
457 Plan::AlterCluster(plan) => {
458 self.sequence_alter_cluster_staged(ctx, plan).await;
459 }
460 Plan::AlterClusterRename(plan) => {
461 let result = self.sequence_alter_cluster_rename(&mut ctx, plan).await;
462 ctx.retire(result);
463 }
464 Plan::AlterClusterSwap(plan) => {
465 let result = self.sequence_alter_cluster_swap(&mut ctx, plan).await;
466 ctx.retire(result);
467 }
468 Plan::AlterClusterReplicaRename(plan) => {
469 let result = self
470 .sequence_alter_cluster_replica_rename(ctx.session(), plan)
471 .await;
472 ctx.retire(result);
473 }
474 Plan::AlterConnection(plan) => {
475 self.sequence_alter_connection(ctx, plan).await;
476 }
477 Plan::AlterSetCluster(plan) => {
478 let result = self.sequence_alter_set_cluster(ctx.session(), plan).await;
479 ctx.retire(result);
480 }
481 Plan::AlterRetainHistory(plan) => {
482 let result = self.sequence_alter_retain_history(&mut ctx, plan).await;
483 ctx.retire(result);
484 }
485 Plan::AlterSourceTimestampInterval(plan) => {
486 let result = self
487 .sequence_alter_source_timestamp_interval(&mut ctx, plan)
488 .await;
489 ctx.retire(result);
490 }
491 Plan::AlterItemRename(plan) => {
492 let result = self.sequence_alter_item_rename(&mut ctx, plan).await;
493 ctx.retire(result);
494 }
495 Plan::AlterSchemaRename(plan) => {
496 let result = self.sequence_alter_schema_rename(&mut ctx, plan).await;
497 ctx.retire(result);
498 }
499 Plan::AlterSchemaSwap(plan) => {
500 let result = self.sequence_alter_schema_swap(&mut ctx, plan).await;
501 ctx.retire(result);
502 }
503 Plan::AlterRole(plan) => {
504 let result = self.sequence_alter_role(ctx.session_mut(), plan).await;
505 ctx.retire(result);
506 }
507 Plan::AlterSecret(plan) => {
508 self.sequence_alter_secret(ctx, plan).await;
509 }
510 Plan::AlterSink(plan) => {
511 self.sequence_alter_sink_prepare(ctx, plan).await;
512 }
513 Plan::AlterSource(plan) => {
514 let result = self.sequence_alter_source(ctx.session_mut(), plan).await;
515 ctx.retire(result);
516 }
517 Plan::AlterSystemSet(plan) => {
518 let result = self.sequence_alter_system_set(ctx.session(), plan).await;
519 ctx.retire(result);
520 }
521 Plan::AlterSystemReset(plan) => {
522 let result = self.sequence_alter_system_reset(ctx.session(), plan).await;
523 ctx.retire(result);
524 }
525 Plan::AlterSystemResetAll(plan) => {
526 let result = self
527 .sequence_alter_system_reset_all(ctx.session(), plan)
528 .await;
529 ctx.retire(result);
530 }
531 Plan::AlterTableAddColumn(plan) => {
532 let result = self.sequence_alter_table(&mut ctx, plan).await;
533 ctx.retire(result);
534 }
535 Plan::AlterMaterializedViewApplyReplacement(plan) => {
536 self.sequence_alter_materialized_view_apply_replacement_prepare(ctx, plan)
537 .await;
538 }
539 Plan::AlterNetworkPolicy(plan) => {
540 let res = self
541 .sequence_alter_network_policy(ctx.session(), plan)
542 .await;
543 ctx.retire(res);
544 }
545 Plan::DiscardTemp => {
546 self.drop_temp_items(ctx.session().conn_id()).await;
547 ctx.retire(Ok(ExecuteResponse::DiscardedTemp));
548 }
549 Plan::DiscardAll => {
550 let ret = if let TransactionStatus::Started(_) = ctx.session().transaction() {
551 self.clear_transaction(ctx.session_mut()).await;
552 self.drop_temp_items(ctx.session().conn_id()).await;
553 ctx.session_mut().reset();
554 Ok(ExecuteResponse::DiscardedAll)
555 } else {
556 Err(AdapterError::OperationProhibitsTransaction(
557 "DISCARD ALL".into(),
558 ))
559 };
560 ctx.retire(ret);
561 }
562 Plan::Declare(plan) => {
563 self.declare(ctx, plan.name, plan.stmt, plan.sql, plan.params);
564 }
565 Plan::Fetch(FetchPlan {
566 name,
567 count,
568 timeout,
569 }) => {
570 let ctx_extra = std::mem::take(ctx.extra_mut());
571 ctx.retire(Ok(ExecuteResponse::Fetch {
572 name,
573 count,
574 timeout,
575 ctx_extra,
576 }));
577 }
578 Plan::Close(plan) => {
579 if ctx.session_mut().remove_portal(&plan.name) {
580 ctx.retire(Ok(ExecuteResponse::ClosedCursor));
581 } else {
582 ctx.retire(Err(AdapterError::UnknownCursor(plan.name)));
583 }
584 }
585 Plan::Prepare(plan) => {
586 if ctx
587 .session()
588 .get_prepared_statement_unverified(&plan.name)
589 .is_some()
590 {
591 ctx.retire(Err(AdapterError::PreparedStatementExists(plan.name)));
592 } else {
593 let state_revision = StateRevision {
594 catalog_revision: self.catalog().transient_revision(),
595 session_state_revision: ctx.session().state_revision(),
596 };
597 ctx.session_mut().set_prepared_statement(
598 plan.name,
599 Some(plan.stmt),
600 plan.sql,
601 plan.desc,
602 state_revision,
603 self.now(),
604 );
605 ctx.retire(Ok(ExecuteResponse::Prepare));
606 }
607 }
608 Plan::Execute(plan) => {
609 match self.sequence_execute(ctx.session_mut(), plan) {
610 Ok(portal_name) => {
611 let (tx, _, session, extra) = ctx.into_parts();
612 self.internal_cmd_tx
613 .send(Message::Command(
614 OpenTelemetryContext::obtain(),
615 Command::Execute {
616 portal_name,
617 session,
618 tx: tx.take(),
619 outer_ctx_extra: Some(extra),
620 },
621 ))
622 .expect("sending to self.internal_cmd_tx cannot fail");
623 }
624 Err(err) => ctx.retire(Err(err)),
625 };
626 }
627 Plan::Deallocate(plan) => match plan.name {
628 Some(name) => {
629 if ctx.session_mut().remove_prepared_statement(&name) {
630 ctx.retire(Ok(ExecuteResponse::Deallocate { all: false }));
631 } else {
632 ctx.retire(Err(AdapterError::UnknownPreparedStatement(name)));
633 }
634 }
635 None => {
636 ctx.session_mut().remove_all_prepared_statements();
637 ctx.retire(Ok(ExecuteResponse::Deallocate { all: true }));
638 }
639 },
640 Plan::Raise(RaisePlan { severity }) => {
641 ctx.session()
642 .add_notice(AdapterNotice::UserRequested { severity });
643 ctx.retire(Ok(ExecuteResponse::Raised));
644 }
645 Plan::GrantPrivileges(plan) => {
646 let result = self
647 .sequence_grant_privileges(ctx.session_mut(), plan)
648 .await;
649 ctx.retire(result);
650 }
651 Plan::RevokePrivileges(plan) => {
652 let result = self
653 .sequence_revoke_privileges(ctx.session_mut(), plan)
654 .await;
655 ctx.retire(result);
656 }
657 Plan::AlterDefaultPrivileges(plan) => {
658 let result = self
659 .sequence_alter_default_privileges(ctx.session_mut(), plan)
660 .await;
661 ctx.retire(result);
662 }
663 Plan::GrantRole(plan) => {
664 let result = self.sequence_grant_role(ctx.session_mut(), plan).await;
665 ctx.retire(result);
666 }
667 Plan::RevokeRole(plan) => {
668 let result = self.sequence_revoke_role(ctx.session_mut(), plan).await;
669 ctx.retire(result);
670 }
671 Plan::AlterOwner(plan) => {
672 let result = self.sequence_alter_owner(ctx.session_mut(), plan).await;
673 ctx.retire(result);
674 }
675 Plan::ReassignOwned(plan) => {
676 let result = self.sequence_reassign_owned(ctx.session_mut(), plan).await;
677 ctx.retire(result);
678 }
679 Plan::ValidateConnection(plan) => {
680 let connection = plan
681 .connection
682 .into_inline_connection(self.catalog().state());
683 let current_storage_configuration = self.controller.storage.config().clone();
684 mz_ore::task::spawn(|| "coord::validate_connection", async move {
685 let res = match connection
686 .validate(plan.id, ¤t_storage_configuration)
687 .await
688 {
689 Ok(()) => Ok(ExecuteResponse::ValidatedConnection),
690 Err(err) => Err(err.into()),
691 };
692 ctx.retire(res);
693 });
694 }
695 }
696 }
697 .instrument(tracing::debug_span!("coord::sequencer::sequence_plan"))
698 .boxed_local()
699 }
700
701 #[mz_ore::instrument(level = "debug")]
702 pub(crate) async fn sequence_execute_single_statement_transaction(
703 &mut self,
704 ctx: ExecuteContext,
705 stmt: Arc<Statement<Raw>>,
706 params: Params,
707 ) {
708 let (tx, internal_cmd_tx, mut session, extra) = ctx.into_parts();
710 assert!(matches!(session.transaction(), TransactionStatus::Default));
711 session.start_transaction_single_stmt(self.now_datetime());
712 let conn_id = session.conn_id().unhandled();
713
714 let (sub_tx, sub_rx) = oneshot::channel();
716 let sub_ct = ClientTransmitter::new(sub_tx, self.internal_cmd_tx.clone());
717 let sub_ctx = ExecuteContext::from_parts(sub_ct, internal_cmd_tx, session, extra);
718 self.handle_execute_inner(stmt, params, sub_ctx).await;
719
720 let internal_cmd_tx = self.internal_cmd_tx.clone();
723 mz_ore::task::spawn(
724 || format!("execute_single_statement:{conn_id}"),
725 async move {
726 let Ok(Response {
727 result,
728 session,
729 otel_ctx,
730 }) = sub_rx.await
731 else {
732 return;
734 };
735 otel_ctx.attach_as_parent();
736 let (sub_tx, sub_rx) = oneshot::channel();
737 let _ = internal_cmd_tx.send(Message::Command(
738 otel_ctx,
739 Command::Commit {
740 action: EndTransactionAction::Commit,
741 session,
742 tx: sub_tx,
743 },
744 ));
745 let Ok(commit_response) = sub_rx.await else {
746 return;
748 };
749 assert!(matches!(
750 commit_response.session.transaction(),
751 TransactionStatus::Default
752 ));
753 let result = match (result, commit_response.result) {
759 (Ok(_), commit) => commit,
760 (Err(result), _) => Err(result),
761 };
762 tx.send(result, commit_response.session);
765 }
766 .instrument(Span::current()),
767 );
768 }
769
770 #[mz_ore::instrument(level = "debug")]
774 pub(crate) async fn sequence_create_role_for_startup(
775 &mut self,
776 plan: CreateRolePlan,
777 ) -> Result<ExecuteResponse, AdapterError> {
778 self.sequence_create_role(None, plan).await
785 }
786
787 pub(crate) fn allocate_transient_id(&self) -> (CatalogItemId, GlobalId) {
788 self.transient_id_gen.allocate_id()
789 }
790
791 fn should_emit_rbac_notice(&self, session: &Session) -> Option<AdapterNotice> {
792 if !rbac::is_rbac_enabled_for_session(self.catalog.system_config(), session) {
793 Some(AdapterNotice::RbacUserDisabled)
794 } else {
795 None
796 }
797 }
798
799 pub(crate) fn insert_constant(
806 catalog: &Catalog,
807 session: &mut Session,
808 target_id: CatalogItemId,
809 constants: MirRelationExpr,
810 ) -> Result<ExecuteResponse, AdapterError> {
811 let desc = match catalog.try_get_entry(&target_id) {
813 Some(table) => {
814 table.relation_desc_latest().expect("table has desc")
816 }
817 None => {
818 return Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
819 kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
820 target_id.to_string(),
821 )),
822 }));
823 }
824 };
825
826 match constants.as_const() {
827 Some((rows, ..)) => {
828 let rows = rows.clone()?;
829 for (row, _) in &rows {
830 for (i, datum) in row.iter().enumerate() {
831 desc.constraints_met(i, &datum)?;
832 }
833 }
834 let diffs_plan = plan::SendDiffsPlan {
835 id: target_id,
836 updates: rows,
837 kind: MutationKind::Insert,
838 returning: Vec::new(),
839 max_result_size: catalog.system_config().max_result_size(),
840 };
841 Self::send_diffs(session, diffs_plan)
842 }
843 None => panic!(
844 "tried using sequence_insert_constant on non-constant MirRelationExpr\n{}",
845 constants.pretty(),
846 ),
847 }
848 }
849
850 #[mz_ore::instrument(level = "debug")]
851 pub(crate) fn send_diffs(
852 session: &mut Session,
853 mut plan: plan::SendDiffsPlan,
854 ) -> Result<ExecuteResponse, AdapterError> {
855 let affected_rows = {
856 let mut affected_rows = Diff::from(0);
857 let mut all_positive_diffs = true;
858 for (_, diff) in plan.updates.iter() {
861 if diff.is_negative() {
862 all_positive_diffs = false;
863 break;
864 }
865
866 affected_rows += diff;
867 }
868
869 if !all_positive_diffs {
870 differential_dataflow::consolidation::consolidate(&mut plan.updates);
874
875 affected_rows = Diff::ZERO;
876 for (_, diff) in plan.updates.iter() {
881 affected_rows += diff.abs();
882 }
883 }
884
885 usize::try_from(affected_rows.into_inner()).expect("positive Diff must fit")
886 };
887 event!(
888 Level::TRACE,
889 affected_rows,
890 id = format!("{:?}", plan.id),
891 kind = format!("{:?}", plan.kind),
892 updates = plan.updates.len(),
893 returning = plan.returning.len(),
894 );
895
896 session.add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
897 id: plan.id,
898 rows: TableData::Rows(plan.updates),
899 }]))?;
900 if !plan.returning.is_empty() {
901 let finishing = RowSetFinishing {
902 order_by: Vec::new(),
903 limit: None,
904 offset: 0,
905 project: (0..plan.returning[0].0.iter().count()).collect(),
906 };
907 let max_returned_query_size = session.vars().max_query_result_size();
908 let duration_histogram = session.metrics().row_set_finishing_seconds();
909
910 return match finishing.finish(
911 RowCollection::new(plan.returning, &finishing.order_by),
912 plan.max_result_size,
913 Some(max_returned_query_size),
914 duration_histogram,
915 ) {
916 Ok((rows, _size_bytes)) => Ok(Self::send_immediate_rows(rows)),
917 Err(e) => Err(AdapterError::ResultSize(e)),
918 };
919 }
920 Ok(match plan.kind {
921 MutationKind::Delete => ExecuteResponse::Deleted(affected_rows),
922 MutationKind::Insert => ExecuteResponse::Inserted(affected_rows),
923 MutationKind::Update => ExecuteResponse::Updated(affected_rows / 2),
924 })
925 }
926}
927
928pub(crate) fn check_log_reads(
937 catalog: &Catalog,
938 cluster: &Cluster,
939 source_ids: &BTreeSet<GlobalId>,
940 target_replica: &mut Option<ReplicaId>,
941 vars: &SessionVars,
942) -> Result<impl IntoIterator<Item = AdapterNotice>, AdapterError>
943where
944{
945 let log_names = source_ids
946 .iter()
947 .map(|gid| catalog.resolve_item_id(gid))
948 .flat_map(|item_id| catalog.introspection_dependencies(item_id))
949 .map(|item_id| catalog.get_entry(&item_id).name().item.clone())
950 .collect::<Vec<_>>();
951
952 if log_names.is_empty() {
953 return Ok(None);
954 }
955
956 let num_replicas = cluster.replicas().count();
960 if target_replica.is_none() {
961 if num_replicas == 1 {
962 *target_replica = cluster.replicas().map(|r| r.replica_id).next();
963 } else {
964 return Err(AdapterError::UntargetedLogRead { log_names });
965 }
966 }
967
968 let replica_id = target_replica.expect("set to `Some` above");
971 let replica = &cluster.replica(replica_id).expect("Replica must exist");
972 if !replica.config.compute.logging.enabled() {
973 return Err(AdapterError::IntrospectionDisabled { log_names });
974 }
975
976 Ok(vars
977 .emit_introspection_query_notice()
978 .then_some(AdapterNotice::PerReplicaLogRead { log_names }))
979}
980
981pub(crate) fn emit_optimizer_notices(
983 catalog: &Catalog,
984 session: &Session,
985 notices: &[RawOptimizerNotice],
986) {
987 if notices.is_empty() {
989 return;
990 }
991 let humanizer = catalog.for_session(session);
992 let system_vars = catalog.system_config();
993 for notice in notices {
994 let kind = OptimizerNoticeKind::from(notice);
995 let notice_enabled = match kind {
996 OptimizerNoticeKind::EqualsNull => system_vars.enable_notices_for_equals_null(),
997 OptimizerNoticeKind::IndexAlreadyExists => {
998 system_vars.enable_notices_for_index_already_exists()
999 }
1000 OptimizerNoticeKind::IndexTooWideForLiteralConstraints => {
1001 system_vars.enable_notices_for_index_too_wide_for_literal_constraints()
1002 }
1003 OptimizerNoticeKind::IndexKeyEmpty => system_vars.enable_notices_for_index_empty_key(),
1004 };
1005 if notice_enabled {
1006 session.add_notice(AdapterNotice::OptimizerNotice {
1010 notice: notice.message(&humanizer, false).to_string(),
1011 hint: notice.hint(&humanizer, false).to_string(),
1012 });
1013 }
1014 session
1015 .metrics()
1016 .optimization_notices(&[kind.metric_label()])
1017 .inc_by(1);
1018 }
1019}
1020
1021pub fn eval_copy_to_uri(
1026 to: HirScalarExpr,
1027 session: &Session,
1028 catalog_state: &CatalogState,
1029) -> Result<Uri, AdapterError> {
1030 let style = ExprPrepOneShot {
1031 logical_time: EvalTime::NotAvailable,
1032 session,
1033 catalog_state,
1034 };
1035 let mut to = to.lower_uncorrelated(catalog_state.system_config())?;
1036 style.prep_scalar_expr(&mut to)?;
1037 let temp_storage = RowArena::new();
1038 let evaled = to.eval(&[], &temp_storage)?;
1039 if evaled == Datum::Null {
1040 coord_bail!("COPY TO target value can not be null");
1041 }
1042 let to_url = match Uri::from_str(evaled.unwrap_str()) {
1043 Ok(url) => {
1044 if url.scheme_str() != Some("s3") && url.scheme_str() != Some("gs") {
1045 coord_bail!("only 's3://...' and 'gs://...' urls are supported as COPY TO target");
1046 }
1047 url
1048 }
1049 Err(e) => coord_bail!("could not parse COPY TO target url: {}", e),
1050 };
1051 Ok(to_url)
1052}
1053
1054pub(crate) async fn explain_pushdown_future_inner<
1061 I: IntoIterator<Item = (GlobalId, MapFilterProject)>,
1062>(
1063 session: &Session,
1064 catalog: &Catalog,
1065 storage_collections: &Arc<dyn StorageCollections + Send + Sync>,
1066 as_of: Antichain<Timestamp>,
1067 mz_now: ResultSpec<'static>,
1068 imports: I,
1069) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
1070 let mut explain_timeout = *session.vars().statement_timeout();
1071 if explain_timeout == Duration::ZERO {
1073 explain_timeout = Duration::MAX;
1074 }
1075 let mut futures = FuturesOrdered::new();
1076 for (id, mfp) in imports {
1077 let catalog_entry = catalog.get_entry_by_global_id(&id);
1078 let full_name = catalog
1079 .for_session(session)
1080 .resolve_full_name(&catalog_entry.name);
1081 let name = format!("{}", full_name);
1082 let relation_desc = catalog_entry
1083 .relation_desc()
1084 .expect("source should have a proper desc")
1085 .into_owned();
1086 let stats_future = storage_collections
1087 .snapshot_parts_stats(id, as_of.clone())
1088 .await;
1089
1090 let mz_now = mz_now.clone();
1091 futures.push_back(async move {
1096 let snapshot_stats = match stats_future.await {
1097 Ok(stats) => stats,
1098 Err(e) => return Err(e),
1099 };
1100 let mut total_bytes = 0;
1101 let mut total_parts = 0;
1102 let mut selected_bytes = 0;
1103 let mut selected_parts = 0;
1104 for SnapshotPartStats {
1105 encoded_size_bytes: bytes,
1106 stats,
1107 } in &snapshot_stats.parts
1108 {
1109 let bytes = u64::cast_from(*bytes);
1110 total_bytes += bytes;
1111 total_parts += 1u64;
1112 let selected = match stats {
1113 None => true,
1114 Some(stats) => {
1115 let stats = stats.decode();
1116 let stats = RelationPartStats::new(
1117 name.as_str(),
1118 &snapshot_stats.metrics.pushdown.part_stats,
1119 &relation_desc,
1120 &stats,
1121 );
1122 stats.may_match_mfp(mz_now.clone(), &mfp)
1123 }
1124 };
1125
1126 if selected {
1127 selected_bytes += bytes;
1128 selected_parts += 1u64;
1129 }
1130 }
1131 Ok(Row::pack_slice(&[
1132 name.as_str().into(),
1133 total_bytes.into(),
1134 selected_bytes.into(),
1135 total_parts.into(),
1136 selected_parts.into(),
1137 ]))
1138 });
1139 }
1140
1141 let fut = async move {
1142 match tokio::time::timeout(
1143 explain_timeout,
1144 futures::TryStreamExt::try_collect::<Vec<_>>(futures),
1145 )
1146 .await
1147 {
1148 Ok(Ok(rows)) => Ok(ExecuteResponse::SendingRowsImmediate {
1149 rows: Box::new(rows.into_row_iter()),
1150 }),
1151 Ok(Err(err)) => Err(err.into()),
1152 Err(_) => Err(AdapterError::StatementTimeout),
1153 }
1154 };
1155 fut
1156}
1157
1158pub(crate) async fn explain_plan_inner(
1161 session: &Session,
1162 catalog: &Catalog,
1163 df_meta: DataflowMetainfo,
1164 explain_ctx: ExplainPlanContext,
1165 optimizer: peek::Optimizer,
1166 insights_ctx: Option<Box<PlanInsightsContext>>,
1167) -> Result<Vec<Row>, AdapterError> {
1168 let ExplainPlanContext {
1169 config,
1170 format,
1171 stage,
1172 desc,
1173 optimizer_trace,
1174 ..
1175 } = explain_ctx;
1176
1177 let desc = desc.expect("RelationDesc for SelectPlan in EXPLAIN mode");
1178
1179 let session_catalog = catalog.for_session(session);
1180 let expr_humanizer = {
1181 let transient_items = btreemap! {
1182 optimizer.select_id() => TransientItem::new(
1183 Some(vec![GlobalId::Explain.to_string()]),
1184 Some(desc.iter_names().map(|c| c.to_string()).collect()),
1185 )
1186 };
1187 ExprHumanizerExt::new(transient_items, &session_catalog)
1188 };
1189
1190 let finishing = if optimizer.finishing().is_trivial(desc.arity()) {
1191 None
1192 } else {
1193 Some(optimizer.finishing().clone())
1194 };
1195
1196 let target_cluster = catalog.get_cluster(optimizer.cluster_id());
1197 let features = optimizer.config().features.clone();
1198
1199 let rows = optimizer_trace
1200 .into_rows(
1201 format,
1202 &config,
1203 &features,
1204 &expr_humanizer,
1205 finishing,
1206 Some(target_cluster),
1207 df_meta,
1208 stage,
1209 plan::ExplaineeStatementKind::Select,
1210 insights_ctx,
1211 )
1212 .await?;
1213
1214 Ok(rows)
1215}
1216
1217pub(crate) async fn statistics_oracle(
1222 session: &Session,
1223 source_ids: &BTreeSet<GlobalId>,
1224 query_as_of: &Antichain<Timestamp>,
1225 is_oneshot: bool,
1226 system_config: &vars::SystemVars,
1227 storage_collections: &dyn StorageCollections,
1228) -> Result<Box<dyn StatisticsOracle>, AdapterError> {
1229 if !session.vars().enable_session_cardinality_estimates() {
1230 let stats: Box<dyn StatisticsOracle> = Box::new(EmptyStatisticsOracle);
1231 return Ok(stats);
1232 }
1233
1234 let timeout = if is_oneshot {
1235 system_config.optimizer_oneshot_stats_timeout()
1237 } else {
1238 system_config.optimizer_stats_timeout()
1239 };
1240
1241 let cached_stats = mz_ore::future::timeout(
1242 timeout,
1243 CachedStatisticsOracle::new(source_ids, query_as_of, storage_collections),
1244 )
1245 .await;
1246
1247 match cached_stats {
1248 Ok(stats) => Ok(Box::new(stats)),
1249 Err(mz_ore::future::TimeoutError::DeadlineElapsed) => {
1250 warn!(
1251 is_oneshot = is_oneshot,
1252 "optimizer statistics collection timed out after {}ms",
1253 timeout.as_millis()
1254 );
1255
1256 Ok(Box::new(EmptyStatisticsOracle))
1257 }
1258 Err(mz_ore::future::TimeoutError::Inner(e)) => Err(AdapterError::Storage(e)),
1259 }
1260}
1261
1262#[derive(Debug)]
1263struct CachedStatisticsOracle {
1264 cache: BTreeMap<GlobalId, usize>,
1265}
1266
1267impl CachedStatisticsOracle {
1268 pub async fn new(
1269 ids: &BTreeSet<GlobalId>,
1270 as_of: &Antichain<Timestamp>,
1271 storage_collections: &dyn StorageCollections,
1272 ) -> Result<Self, StorageError> {
1273 let mut cache = BTreeMap::new();
1274
1275 for id in ids {
1276 let stats = storage_collections.snapshot_stats(*id, as_of.clone()).await;
1277
1278 match stats {
1279 Ok(stats) => {
1280 cache.insert(*id, stats.num_updates);
1281 }
1282 Err(StorageError::IdentifierMissing(id)) => {
1283 ::tracing::debug!("no statistics for {id}")
1284 }
1285 Err(e) => return Err(e),
1286 }
1287 }
1288
1289 Ok(Self { cache })
1290 }
1291}
1292
1293impl StatisticsOracle for CachedStatisticsOracle {
1294 fn cardinality_estimate(&self, id: GlobalId) -> Option<usize> {
1295 self.cache.get(&id).map(|estimate| *estimate)
1296 }
1297
1298 fn as_map(&self) -> BTreeMap<GlobalId, usize> {
1299 self.cache.clone()
1300 }
1301}