1use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::sync::Arc;
13use std::time::Duration;
14
15use itertools::Itertools;
16use mz_adapter_types::dyncfgs::ENABLE_FRONTEND_SUBSCRIBES;
17use mz_compute_types::ComputeInstanceId;
18use mz_compute_types::dataflows::DataflowDescription;
19use mz_controller_types::ClusterId;
20use mz_expr::{CollectionPlan, ResultSpec, RowSetFinishing};
21use mz_ore::cast::{CastFrom, CastLossy};
22use mz_ore::collections::CollectionExt;
23use mz_ore::now::EpochMillis;
24use mz_ore::task::JoinHandle;
25use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
26use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
27use mz_repr::role_id::RoleId;
28use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
29use mz_sql::ast::Raw;
30use mz_sql::catalog::CatalogCluster;
31use mz_sql::plan::Params;
32use mz_sql::plan::{
33 self, Explainee, ExplaineeStatement, Plan, QueryWhen, SelectPlan, SubscribePlan,
34};
35use mz_sql::rbac;
36use mz_sql::session::metadata::SessionMetadata;
37use mz_sql::session::vars::IsolationLevel;
38use mz_sql_parser::ast::{CopyDirection, ExplainStage, ShowStatement, Statement};
39use mz_transform::EmptyStatisticsOracle;
40use mz_transform::dataflow::DataflowMetainfo;
41use opentelemetry::trace::TraceContextExt;
42use timely::progress::Antichain;
43use tracing::{Span, debug, warn};
44use tracing_opentelemetry::OpenTelemetrySpanExt;
45
46use crate::catalog::Catalog;
47use crate::command::Command;
48use crate::coord::peek::{FastPathPlan, PeekPlan};
49use crate::coord::sequencer::{eval_copy_to_uri, statistics_oracle};
50use crate::coord::timeline::timedomain_for;
51use crate::coord::timestamp_selection::TimestampDetermination;
52use crate::coord::{
53 Coordinator, CopyToContext, ExecuteContextGuard, ExplainContext, ExplainPlanContext,
54 TargetCluster,
55};
56use crate::explain::insights::PlanInsightsContext;
57use crate::explain::optimizer_trace::OptimizerTrace;
58use crate::optimize::Optimize;
59use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
60use crate::session::{Session, TransactionOps, TransactionStatus};
61use crate::statement_logging::WatchSetCreation;
62use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
63use crate::{
64 AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, PeekClient, ReadHolds,
65 TimelineContext, TimestampContext, TimestampProvider, optimize,
66};
67use crate::{coord, metrics};
68
69impl PeekClient {
70 pub(crate) async fn try_frontend_peek(
78 &mut self,
79 portal_name: &str,
80 catalog: Option<Arc<Catalog>>,
81 session: &mut Session,
82 outer_ctx_extra: &mut Option<ExecuteContextGuard>,
83 ) -> Result<Option<ExecuteResponse>, AdapterError> {
84 if session.vars().emit_trace_id_notice() {
87 let span_context = tracing::Span::current()
88 .context()
89 .span()
90 .span_context()
91 .clone();
92 if span_context.is_valid() {
93 session.add_notice(AdapterNotice::QueryTrace {
94 trace_id: span_context.trace_id(),
95 });
96 }
97 }
98
99 let catalog = match catalog {
106 Some(c) => c,
107 None => self.catalog_snapshot("try_frontend_peek").await,
108 };
109
110 let (stmt, params, logging, lifecycle_timestamps) = {
112 if let Err(err) = Coordinator::verify_portal(&*catalog, session, portal_name) {
113 outer_ctx_extra
114 .take()
115 .and_then(|guard| guard.defuse().retire());
116 return Err(err);
117 }
118 let portal = session
119 .get_portal_unverified(portal_name)
120 .expect("called verify_portal above");
123 let params = portal.parameters.clone();
124 let stmt = portal.stmt.clone();
125 let logging = Arc::clone(&portal.logging);
126 let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
127 (stmt, params, logging, lifecycle_timestamps)
128 };
129
130 if let Some(ref stmt) = stmt {
133 match &**stmt {
134 Statement::Select(_)
135 | Statement::ExplainAnalyzeObject(_)
136 | Statement::ExplainAnalyzeCluster(_)
137 | Statement::Show(ShowStatement::ShowObjects(_))
138 | Statement::Show(ShowStatement::ShowColumns(_)) => {
139 }
144 Statement::ExplainPlan(explain_stmt) => {
145 match &explain_stmt.explainee {
150 mz_sql_parser::ast::Explainee::Select(..) => {
151 }
153 _ => {
154 debug!(
155 "Bailing out from try_frontend_peek, because EXPLAIN is not for a SELECT query"
156 );
157 return Ok(None);
158 }
159 }
160 }
161 Statement::ExplainPushdown(explain_stmt) => {
162 match &explain_stmt.explainee {
164 mz_sql_parser::ast::Explainee::Select(_, false) => {}
165 _ => {
166 debug!(
167 "Bailing out from try_frontend_peek, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is for EXPLAIN BROKEN"
168 );
169 return Ok(None);
170 }
171 }
172 }
173 Statement::Copy(copy_stmt) => {
174 match ©_stmt.direction {
175 CopyDirection::To => {
176 }
178 CopyDirection::From => {
179 debug!(
180 "Bailing out from try_frontend_peek, because COPY FROM is not supported"
181 );
182 return Ok(None);
183 }
184 }
185 }
186
187 Statement::Subscribe(_)
188 if ENABLE_FRONTEND_SUBSCRIBES.get(catalog.system_config().dyncfgs()) =>
189 {
190 }
192 _ => {
193 debug!(
194 "Bailing out from try_frontend_peek, because statement type is not supported"
195 );
196 return Ok(None);
197 }
198 }
199 }
200
201 let logging_guard = self.begin_statement_logging(
204 session,
205 ¶ms,
206 &logging,
207 &catalog,
208 lifecycle_timestamps,
209 outer_ctx_extra,
210 );
211 let statement_logging_id = logging_guard.id();
212 logging_guard.defuse();
220
221 let result = self
222 .try_frontend_peek_inner(session, catalog, stmt, params, statement_logging_id)
223 .await;
224
225 if let Some(logging_id) = statement_logging_id {
228 let reason = match &result {
229 Ok(Some(
233 ExecuteResponse::SendingRowsStreaming { .. }
234 | ExecuteResponse::Subscribing { .. },
235 )) => {
236 return result;
237 }
238 Ok(Some(resp @ ExecuteResponse::CopyTo { resp: inner, .. })) => {
240 match inner.as_ref() {
241 ExecuteResponse::SendingRowsStreaming { .. }
242 | ExecuteResponse::Subscribing { .. } => {
243 return result;
244 }
245 _ => resp.into(),
248 }
249 }
250 Ok(None) => {
252 soft_panic_or_log!(
253 "Bailed out from `try_frontend_peek_inner` after we already logged the beginning of statement execution."
254 );
255 self.log_ended_execution(
258 logging_id,
259 StatementEndedExecutionReason::Errored {
260 error: "Internal error: bailed out from `try_frontend_peek_inner`"
261 .to_string(),
262 },
263 );
264 return result;
265 }
266 Ok(Some(resp)) => resp.into(),
271 Err(e) => StatementEndedExecutionReason::Errored {
272 error: e.to_string(),
273 },
274 };
275
276 self.log_ended_execution(logging_id, reason);
277 }
278
279 result
280 }
281
282 async fn try_frontend_peek_inner(
285 &mut self,
286 session: &mut Session,
287 catalog: Arc<Catalog>,
288 stmt: Option<Arc<Statement<Raw>>>,
289 params: Params,
290 statement_logging_id: Option<crate::statement_logging::StatementLoggingId>,
291 ) -> Result<Option<ExecuteResponse>, AdapterError> {
292 let stmt = match stmt {
293 Some(stmt) => stmt,
294 None => {
295 debug!("try_frontend_peek_inner succeeded on an empty query");
296 return Ok(Some(ExecuteResponse::EmptyQuery));
297 }
298 };
299
300 session
301 .metrics()
302 .query_total(&[
303 metrics::session_type_label_value(session.user()),
304 metrics::statement_type_label_value(&stmt),
305 ])
306 .inc();
307
308 let conn_catalog = catalog.for_session(session);
311 let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?;
314
315 let pcx = session.pcx();
316 let (plan, sql_impl_ids) =
317 mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, ¶ms, &resolved_ids)?;
318
319 enum QueryPlan<'a> {
321 Select(&'a SelectPlan),
322 CopyTo(&'a SelectPlan, CopyToContext),
323 Subscribe(&'a SubscribePlan),
324 }
325
326 let (query_plan, explain_ctx) = match &plan {
327 Plan::Select(select_plan) => {
328 let explain_ctx = if session.vars().emit_plan_insights_notice() {
329 let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
330 ExplainContext::PlanInsightsNotice(optimizer_trace)
331 } else {
332 ExplainContext::None
333 };
334 (QueryPlan::Select(select_plan), explain_ctx)
335 }
336 Plan::ShowColumns(show_columns_plan) => {
337 (
339 QueryPlan::Select(&show_columns_plan.select_plan),
340 ExplainContext::None,
341 )
342 }
343 Plan::ExplainPlan(plan::ExplainPlanPlan {
344 stage,
345 format,
346 config,
347 explainee: Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc }),
348 }) => {
349 let optimizer_trace = OptimizerTrace::new(stage.paths());
351 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
352 broken: *broken,
353 config: config.clone(),
354 format: *format,
355 stage: *stage,
356 replan: None,
357 desc: Some(desc.clone()),
358 optimizer_trace,
359 });
360 (QueryPlan::Select(plan), explain_ctx)
361 }
362 Plan::CopyTo(plan::CopyToPlan {
364 select_plan,
365 desc,
366 to,
367 connection,
368 connection_id,
369 format,
370 max_file_size,
371 }) => {
372 let uri = eval_copy_to_uri(to.clone(), session, catalog.state())?;
373
374 let copy_to_ctx = CopyToContext {
376 desc: desc.clone(),
377 uri,
378 connection: connection.clone(),
379 connection_id: *connection_id,
380 format: format.clone(),
381 max_file_size: *max_file_size,
382 output_batch_count: None,
383 };
384
385 (
386 QueryPlan::CopyTo(select_plan, copy_to_ctx),
387 ExplainContext::None,
388 )
389 }
390 Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => {
391 match explainee {
393 plan::Explainee::Statement(plan::ExplaineeStatement::Select {
394 broken: false,
395 plan,
396 desc: _,
397 }) => {
398 let explain_ctx = ExplainContext::Pushdown;
399 (QueryPlan::Select(plan), explain_ctx)
400 }
401 _ => {
402 soft_panic_or_log!(
405 "unexpected EXPLAIN FILTER PUSHDOWN plan kind in frontend peek sequencing: {:?}",
406 explainee
407 );
408 debug!(
409 "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is EXPLAIN BROKEN"
410 );
411 return Ok(None);
412 }
413 }
414 }
415 Plan::SideEffectingFunc(sef_plan) => {
416 let response = self
420 .call_coordinator(|tx| Command::ExecuteSideEffectingFunc {
421 plan: sef_plan.clone(),
422 conn_id: session.conn_id().clone(),
423 current_role: session.role_metadata().current_role,
424 tx,
425 })
426 .await?;
427 return Ok(Some(response));
428 }
429 Plan::Subscribe(subscribe) => (QueryPlan::Subscribe(subscribe), ExplainContext::None),
430 _ => {
431 soft_panic_or_log!(
434 "Unexpected plan kind in frontend peek sequencing: {:?}",
435 plan
436 );
437 debug!(
438 "Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, side-effecting SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO S3"
439 );
440 return Ok(None);
441 }
442 };
443
444 let when = match query_plan {
445 QueryPlan::Select(s) => &s.when,
446 QueryPlan::CopyTo(s, _) => &s.when,
447 QueryPlan::Subscribe(s) => &s.when,
448 };
449
450 let depends_on = match query_plan {
451 QueryPlan::Select(s) => s.source.depends_on(),
452 QueryPlan::CopyTo(s, _) => s.source.depends_on(),
453 QueryPlan::Subscribe(s) => s.from.depends_on(),
454 };
455
456 let contains_temporal = match query_plan {
457 QueryPlan::Select(s) => s.source.contains_temporal(),
458 QueryPlan::CopyTo(s, _) => s.source.contains_temporal(),
459 QueryPlan::Subscribe(s) => Ok(s.from.contains_temporal()),
460 };
461
462 assert!(plan.allowed_in_read_only());
466
467 let (cluster, target_cluster_id, target_cluster_name) = {
468 let target_cluster = match session.transaction().cluster() {
469 Some(cluster_id) => TargetCluster::Transaction(cluster_id),
471 None => coord::catalog_serving::auto_run_on_catalog_server(
473 &conn_catalog,
474 session,
475 &plan,
476 ),
477 };
478 let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
479 (cluster, cluster.id, &cluster.name)
480 };
481
482 if let Some(logging_id) = &statement_logging_id {
484 self.log_set_cluster(*logging_id, target_cluster_id);
485 }
486
487 coord::catalog_serving::check_cluster_restrictions(
488 target_cluster_name.as_str(),
489 &conn_catalog,
490 &plan,
491 )?;
492
493 rbac::check_plan(
494 &conn_catalog,
495 None::<fn(u32) -> Option<RoleId>>,
498 session,
499 &plan,
500 Some(target_cluster_id),
501 &resolved_ids,
502 &sql_impl_ids,
503 )?;
504
505 if let Some((_, wait_future)) =
506 coord::appends::waiting_on_startup_appends(&*catalog, session, &plan)
507 {
508 wait_future.await;
509 }
510
511 let max_query_result_size = Some(session.vars().max_query_result_size());
512
513 let compute_instance_snapshot =
518 ComputeInstanceSnapshot::new_without_collections(cluster.id());
519
520 let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config())
521 .override_from(&catalog.get_cluster(cluster.id()).config.features())
522 .override_from(&explain_ctx);
523
524 if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
525 return Err(AdapterError::NoClusterReplicasAvailable {
526 name: cluster.name.clone(),
527 is_managed: cluster.is_managed(),
528 });
529 }
530
531 let (_, view_id) = self.transient_id_gen.allocate_id();
532 let (_, index_id) = self.transient_id_gen.allocate_id();
533
534 let target_replica_name = session.vars().cluster_replica();
535 let mut target_replica = target_replica_name
536 .map(|name| {
537 cluster
538 .replica_id(name)
539 .ok_or(AdapterError::UnknownClusterReplica {
540 cluster_name: cluster.name.clone(),
541 replica_name: name.to_string(),
542 })
543 })
544 .transpose()?;
545
546 let source_ids = depends_on;
547 let mut timeline_context = catalog.validate_timeline_context(source_ids.iter().copied())?;
551 if matches!(timeline_context, TimelineContext::TimestampIndependent) && contains_temporal? {
552 timeline_context = TimelineContext::TimestampDependent;
556 }
557
558 let notices = coord::sequencer::check_log_reads(
559 &catalog,
560 cluster,
561 &source_ids,
562 &mut target_replica,
563 session.vars(),
564 )?;
565 session.add_notices(notices);
566
567 let isolation_level = session.vars().transaction_isolation().clone();
570 let timeline = Coordinator::get_timeline(&timeline_context);
571 let needs_linearized_read_ts =
572 Coordinator::needs_linearized_read_ts(&isolation_level, when);
573
574 let oracle_read_ts = match timeline {
575 Some(timeline) if needs_linearized_read_ts => {
576 let oracle = self.ensure_oracle(timeline).await?;
577 let oracle_read_ts = oracle.read_ts().await;
578 Some(oracle_read_ts)
579 }
580 Some(_) | None => None,
581 };
582
583 let vars = session.vars();
586 let real_time_recency_ts: Option<Timestamp> = if vars.real_time_recency()
587 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
588 && !session.contains_read_timestamp()
589 {
590 self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp {
592 source_ids: source_ids.clone(),
593 real_time_recency_timeout: *vars.real_time_recency_timeout(),
594 tx,
595 })
596 .await?
597 } else {
598 None
599 };
600
601 let dataflow_builder =
604 DataflowBuilder::new(catalog.state(), compute_instance_snapshot.clone());
605 let input_id_bundle = dataflow_builder.sufficient_collections(source_ids.clone());
606
607 let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when)
620 && !matches!(query_plan, QueryPlan::Subscribe { .. });
621
622 let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
624 Some(
627 determination @ TimestampDetermination {
628 timestamp_context: TimestampContext::TimelineTimestamp { .. },
629 ..
630 },
631 ) if in_immediate_multi_stmt_txn => {
632 let txn_read_holds_opt = self
640 .call_coordinator(|tx| Command::GetTransactionReadHoldsBundle {
641 conn_id: session.conn_id().clone(),
642 tx,
643 })
644 .await;
645
646 if let Some(txn_read_holds) = txn_read_holds_opt {
647 let allowed_id_bundle = txn_read_holds.id_bundle();
648 let outside = input_id_bundle.difference(&allowed_id_bundle);
649
650 if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
652 let valid_names =
653 allowed_id_bundle.resolve_names(&*catalog, session.conn_id());
654 let invalid_names = outside.resolve_names(&*catalog, session.conn_id());
655 return Err(AdapterError::RelationOutsideTimeDomain {
656 relations: invalid_names,
657 names: valid_names,
658 });
659 }
660
661 let read_holds = txn_read_holds.subset(&input_id_bundle);
663
664 (determination, read_holds)
665 } else {
666 return Err(AdapterError::Internal(
671 "Missing transaction read holds for multi-statement transaction"
672 .to_string(),
673 ));
674 }
675 }
676 _ => {
677 let timedomain_bundle;
684 let determine_bundle = if in_immediate_multi_stmt_txn {
685 timedomain_bundle = timedomain_for(
689 &*catalog,
690 &dataflow_builder,
691 &source_ids,
692 &timeline_context,
693 session.conn_id(),
694 target_cluster_id,
695 )?;
696 &timedomain_bundle
697 } else {
698 &input_id_bundle
700 };
701 let (determination, read_holds) = self
702 .frontend_determine_timestamp(
703 session,
704 determine_bundle,
705 when,
706 target_cluster_id,
707 &timeline_context,
708 oracle_read_ts,
709 real_time_recency_ts,
710 )
711 .await?;
712
713 if in_immediate_multi_stmt_txn {
717 self.call_coordinator(|tx| Command::StoreTransactionReadHolds {
718 conn_id: session.conn_id().clone(),
719 read_holds: read_holds.clone(),
720 tx,
721 })
722 .await;
723 }
724
725 (determination, read_holds)
726 }
727 };
728
729 {
730 for id in input_id_bundle.iter() {
732 let s = read_holds.storage_holds.contains_key(&id);
733 let c = read_holds
734 .compute_ids()
735 .map(|(_instance, coll)| coll)
736 .contains(&id);
737 soft_assert_or_log!(
738 s || c,
739 "missing read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
740 id,
741 in_immediate_multi_stmt_txn,
742 );
743 }
744
745 for id in input_id_bundle.storage_ids.iter() {
748 soft_assert_or_log!(
749 read_holds.storage_holds.contains_key(id),
750 "missing storage read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
751 id,
752 in_immediate_multi_stmt_txn,
753 );
754 }
755 for id in input_id_bundle
756 .compute_ids
757 .iter()
758 .flat_map(|(_instance, colls)| colls)
759 {
760 soft_assert_or_log!(
761 read_holds
762 .compute_ids()
763 .map(|(_instance, coll)| coll)
764 .contains(id),
765 "missing compute read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
766 id,
767 in_immediate_multi_stmt_txn,
768 );
769 }
770 }
771
772 let requires_linearization = (&explain_ctx).into();
785 let mut transaction_determination = determination.clone();
786 match query_plan {
787 QueryPlan::Subscribe { .. } => {
788 if when.is_transactional() {
789 session.add_transaction_ops(TransactionOps::Subscribe)?;
790 }
791 }
792 QueryPlan::Select(..) | QueryPlan::CopyTo(..) => {
793 if when.is_transactional() {
794 session.add_transaction_ops(TransactionOps::Peeks {
795 determination: transaction_determination,
796 cluster_id: target_cluster_id,
797 requires_linearization,
798 })?;
799 } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
800 transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
802 session.add_transaction_ops(TransactionOps::Peeks {
803 determination: transaction_determination,
804 cluster_id: target_cluster_id,
805 requires_linearization,
806 })?;
807 }
808 }
809 }
810
811 let stats = statistics_oracle(
814 session,
815 &source_ids,
816 &determination.timestamp_context.antichain(),
817 true,
818 catalog.system_config(),
819 &*self.storage_collections,
820 )
821 .await
822 .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
823
824 let timestamp_context = determination.timestamp_context.clone();
827 let session_meta = session.meta();
828 let now = catalog.config().now.clone();
829 let target_cluster_name = target_cluster_name.clone();
830 let needs_plan_insights = explain_ctx.needs_plan_insights();
831 let determination_for_pushdown = if matches!(explain_ctx, ExplainContext::Pushdown) {
832 Some(determination.clone())
835 } else {
836 None
837 };
838
839 let span = Span::current();
840
841 let catalog_for_insights = if needs_plan_insights {
843 Some(Arc::clone(&catalog))
844 } else {
845 None
846 };
847 let mut compute_instances = BTreeMap::new();
848 if needs_plan_insights {
849 for user_cluster in catalog.user_clusters() {
850 let snapshot = ComputeInstanceSnapshot::new_without_collections(user_cluster.id);
851 compute_instances.insert(user_cluster.name.clone(), snapshot);
852 }
853 }
854
855 let source_ids_for_closure = source_ids.clone();
856
857 let optimization_future: JoinHandle<Result<_, AdapterError>> = match query_plan {
858 QueryPlan::CopyTo(select_plan, mut copy_to_ctx) => {
859 let raw_expr = select_plan.source.clone();
860
861 let worker_counts = cluster.replicas().map(|r| {
863 let loc = &r.config.location;
864 loc.workers().unwrap_or_else(|| loc.num_processes())
865 });
866 let max_worker_count = match worker_counts.max() {
867 Some(count) => u64::cast_from(count),
868 None => {
869 return Err(AdapterError::NoClusterReplicasAvailable {
870 name: cluster.name.clone(),
871 is_managed: cluster.is_managed(),
872 });
873 }
874 };
875 copy_to_ctx.output_batch_count = Some(max_worker_count);
876
877 let mut optimizer = optimize::copy_to::Optimizer::new(
878 Arc::clone(&catalog),
879 compute_instance_snapshot,
880 view_id,
881 copy_to_ctx,
882 optimizer_config,
883 self.optimizer_metrics.clone(),
884 );
885
886 mz_ore::task::spawn_blocking(
887 || "optimize copy-to",
888 move || {
889 span.in_scope(|| {
890 let _dispatch_guard = explain_ctx.dispatch_guard();
891
892 let local_mir_plan =
895 optimizer.catch_unwind_optimize(raw_expr.clone())?;
896 let local_mir_plan = local_mir_plan.resolve(
898 timestamp_context.clone(),
899 &session_meta,
900 stats,
901 );
902 let global_lir_plan =
904 optimizer.catch_unwind_optimize(local_mir_plan)?;
905 Ok(Execution::CopyToS3 {
906 global_lir_plan,
907 source_ids: source_ids_for_closure,
908 })
909 })
910 },
911 )
912 }
913 QueryPlan::Select(select_plan) => {
914 let select_plan = select_plan.clone();
915 let raw_expr = select_plan.source.clone();
916
917 let mut optimizer = optimize::peek::Optimizer::new(
919 Arc::clone(&catalog),
920 compute_instance_snapshot,
921 select_plan.finishing.clone(),
922 view_id,
923 index_id,
924 optimizer_config,
925 self.optimizer_metrics.clone(),
926 );
927
928 mz_ore::task::spawn_blocking(
929 || "optimize peek",
930 move || {
931 span.in_scope(|| {
932 let _dispatch_guard = explain_ctx.dispatch_guard();
933
934 let pipeline = || {
941 let local_mir_plan =
942 optimizer.catch_unwind_optimize(raw_expr.clone())?;
943 let local_mir_plan = local_mir_plan.resolve(
945 timestamp_context.clone(),
946 &session_meta,
947 stats,
948 );
949 let global_lir_plan =
951 optimizer.catch_unwind_optimize(local_mir_plan)?;
952 Ok::<_, AdapterError>(global_lir_plan)
953 };
954
955 let global_lir_plan_result = pipeline();
956 let optimization_finished_at = now();
957
958 let create_insights_ctx =
959 |optimizer: &optimize::peek::Optimizer,
960 is_notice: bool|
961 -> Option<Box<PlanInsightsContext>> {
962 if !needs_plan_insights {
963 return None;
964 }
965
966 let catalog = catalog_for_insights.as_ref()?;
967
968 let enable_re_optimize = if needs_plan_insights {
969 let dyncfgs = catalog.system_config().dyncfgs();
977 let opt_limit = mz_adapter_types::dyncfgs
978 ::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
979 .get(dyncfgs);
980 !(is_notice && optimizer.duration() > opt_limit)
981 } else {
982 false
983 };
984
985 Some(Box::new(PlanInsightsContext {
986 stmt: select_plan
987 .select
988 .as_deref()
989 .map(Clone::clone)
990 .map(Statement::Select),
991 raw_expr: raw_expr.clone(),
992 catalog: Arc::clone(catalog),
993 compute_instances,
994 target_instance: target_cluster_name,
995 metrics: optimizer.metrics().clone(),
996 finishing: optimizer.finishing().clone(),
997 optimizer_config: optimizer.config().clone(),
998 session: session_meta,
999 timestamp_context,
1000 view_id: optimizer.select_id(),
1001 index_id: optimizer.index_id(),
1002 enable_re_optimize,
1003 }))
1004 };
1005
1006 let global_lir_plan = match global_lir_plan_result {
1007 Ok(plan) => plan,
1008 Err(err) => {
1009 let result = if let ExplainContext::Plan(explain_ctx) =
1010 explain_ctx
1011 && explain_ctx.broken
1012 {
1013 tracing::error!(
1015 "error while handling EXPLAIN statement: {}",
1016 err
1017 );
1018 Ok(Execution::ExplainPlan {
1019 df_meta: Default::default(),
1020 explain_ctx,
1021 optimizer,
1022 insights_ctx: None,
1023 })
1024 } else {
1025 Err(err)
1026 };
1027 return result;
1028 }
1029 };
1030
1031 match explain_ctx {
1032 ExplainContext::Plan(explain_ctx) => {
1033 let (_, df_meta, _) = global_lir_plan.unapply();
1034 let insights_ctx = create_insights_ctx(&optimizer, false);
1035 Ok(Execution::ExplainPlan {
1036 df_meta,
1037 explain_ctx,
1038 optimizer,
1039 insights_ctx,
1040 })
1041 }
1042 ExplainContext::None => Ok(Execution::Peek {
1043 global_lir_plan,
1044 optimization_finished_at,
1045 plan_insights_optimizer_trace: None,
1046 finishing: select_plan.finishing,
1047 copy_to: select_plan.copy_to,
1048 insights_ctx: None,
1049 }),
1050 ExplainContext::PlanInsightsNotice(optimizer_trace) => {
1051 let insights_ctx = create_insights_ctx(&optimizer, true);
1052 Ok(Execution::Peek {
1053 global_lir_plan,
1054 optimization_finished_at,
1055 plan_insights_optimizer_trace: Some(optimizer_trace),
1056 finishing: select_plan.finishing,
1057 copy_to: select_plan.copy_to,
1058 insights_ctx,
1059 })
1060 }
1061 ExplainContext::Pushdown => {
1062 let (plan, _, _) = global_lir_plan.unapply();
1063 let imports = match plan {
1064 PeekPlan::SlowPath(plan) => plan
1065 .desc
1066 .source_imports
1067 .into_iter()
1068 .filter_map(|(id, import)| {
1069 import.desc.arguments.operators.map(|mfp| (id, mfp))
1070 })
1071 .collect(),
1072 PeekPlan::FastPath(_) => {
1073 std::collections::BTreeMap::default()
1074 }
1075 };
1076 Ok(Execution::ExplainPushdown {
1077 imports,
1078 determination: determination_for_pushdown
1079 .expect("it's present for the ExplainPushdown case"),
1080 })
1081 }
1082 }
1083 })
1084 },
1085 )
1086 }
1087 QueryPlan::Subscribe(plan) => {
1088 let plan = plan.clone();
1089 let catalog: Arc<Catalog> = Arc::clone(&catalog);
1090 let debug_name = format!("subscribe-{}", index_id);
1091 let mut optimizer = optimize::subscribe::Optimizer::new(
1092 catalog,
1093 compute_instance_snapshot.clone(),
1094 view_id,
1095 index_id,
1096 plan.with_snapshot,
1097 plan.up_to,
1098 debug_name,
1099 optimizer_config,
1100 self.optimizer_metrics.clone(),
1101 );
1102 mz_ore::task::spawn_blocking(
1103 || "optimize subscribe",
1104 move || {
1105 span.in_scope(|| {
1106 let _dispatch_guard = explain_ctx.dispatch_guard();
1107
1108 let global_mir_plan = optimizer.catch_unwind_optimize(plan.clone())?;
1109 let as_of = timestamp_context.timestamp_or_default();
1110
1111 if let Some(up_to) = optimizer.up_to() {
1112 if as_of > up_to {
1113 return Err(AdapterError::AbsurdSubscribeBounds {
1114 as_of,
1115 up_to,
1116 });
1117 }
1118 }
1119 let local_mir_plan =
1120 global_mir_plan.resolve(Antichain::from_elem(as_of));
1121
1122 let global_lir_plan =
1123 optimizer.catch_unwind_optimize(local_mir_plan)?;
1124 let optimization_finished_at = now();
1125
1126 let (df_desc, df_meta) = global_lir_plan.unapply();
1127 Ok(Execution::Subscribe {
1128 subscribe_plan: plan,
1129 df_desc,
1130 df_meta,
1131 optimization_finished_at,
1132 })
1133 })
1134 },
1135 )
1136 }
1137 };
1138
1139 let mut optimization_timeout = *session.vars().statement_timeout();
1140 if optimization_timeout == Duration::ZERO {
1142 optimization_timeout = Duration::MAX;
1143 }
1144 let optimization_result =
1145 match tokio::time::timeout(optimization_timeout, optimization_future).await {
1150 Ok(Ok(result)) => result,
1151 Ok(Err(AdapterError::Optimizer(err))) => {
1152 return Err(AdapterError::Internal(format!(
1153 "internal error in optimizer: {}",
1154 err
1155 )));
1156 }
1157 Ok(Err(err)) => {
1158 return Err(err);
1159 }
1160 Err(_elapsed) => {
1161 warn!("optimize peek timed out after {:?}", optimization_timeout);
1162 return Err(AdapterError::StatementTimeout);
1163 }
1164 };
1165
1166 if let Some(logging_id) = &statement_logging_id {
1168 self.log_lifecycle_event(*logging_id, StatementLifecycleEvent::OptimizationFinished);
1169 }
1170
1171 Self::assert_read_holds_correct(
1173 &read_holds,
1174 &optimization_result,
1175 &determination,
1176 target_cluster_id,
1177 in_immediate_multi_stmt_txn,
1178 );
1179
1180 match optimization_result {
1182 Execution::ExplainPlan {
1183 df_meta,
1184 explain_ctx,
1185 optimizer,
1186 insights_ctx,
1187 } => {
1188 let rows = coord::sequencer::explain_plan_inner(
1189 session,
1190 &catalog,
1191 df_meta,
1192 explain_ctx,
1193 optimizer,
1194 insights_ctx,
1195 )
1196 .await?;
1197
1198 Ok(Some(ExecuteResponse::SendingRowsImmediate {
1199 rows: Box::new(rows.into_row_iter()),
1200 }))
1201 }
1202 Execution::ExplainPushdown {
1203 imports,
1204 determination,
1205 } => {
1206 let as_of = determination.timestamp_context.antichain();
1209 let mz_now = determination
1210 .timestamp_context
1211 .timestamp()
1212 .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1213 .unwrap_or_else(ResultSpec::value_all);
1214
1215 Ok(Some(
1216 coord::sequencer::explain_pushdown_future_inner(
1217 session,
1218 &*catalog,
1219 &self.storage_collections,
1220 as_of,
1221 mz_now,
1222 imports,
1223 )
1224 .await
1225 .await?,
1226 ))
1227 }
1228 Execution::Peek {
1229 global_lir_plan,
1230 optimization_finished_at: _optimization_finished_at,
1231 plan_insights_optimizer_trace,
1232 finishing,
1233 copy_to,
1234 insights_ctx,
1235 } => {
1236 let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
1241
1242 coord::sequencer::emit_optimizer_notices(
1243 &*catalog,
1244 session,
1245 &df_meta.optimizer_notices,
1246 );
1247
1248 if let Some(trace) = plan_insights_optimizer_trace {
1250 let target_cluster = catalog.get_cluster(target_cluster_id);
1251 let features = OptimizerFeatures::from(catalog.system_config())
1252 .override_from(&target_cluster.config.features());
1253 let insights = trace
1254 .into_plan_insights(
1255 &features,
1256 &catalog.for_session(session),
1257 Some(finishing.clone()),
1258 Some(target_cluster),
1259 df_meta.clone(),
1260 insights_ctx,
1261 )
1262 .await?;
1263 session.add_notice(AdapterNotice::PlanInsights(insights));
1264 }
1265
1266 let watch_set = statement_logging_id.map(|logging_id| {
1269 WatchSetCreation::new(
1270 logging_id,
1271 catalog.state(),
1272 &input_id_bundle,
1273 determination.timestamp_context.timestamp_or_default(),
1274 )
1275 });
1276
1277 let max_result_size = catalog.system_config().max_result_size();
1278
1279 let determination_for_notice = if session.vars().emit_timestamp_notice() {
1282 Some(determination.clone())
1283 } else {
1284 None
1285 };
1286
1287 let response = match peek_plan {
1288 PeekPlan::FastPath(fast_path_plan) => {
1289 if let Some(logging_id) = &statement_logging_id {
1290 if !matches!(fast_path_plan, FastPathPlan::Constant(..)) {
1304 self.log_set_timestamp(
1305 *logging_id,
1306 determination.timestamp_context.timestamp_or_default(),
1307 );
1308 }
1309 }
1310
1311 let row_set_finishing_seconds =
1312 session.metrics().row_set_finishing_seconds().clone();
1313
1314 let peek_stash_read_batch_size_bytes =
1315 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
1316 .get(catalog.system_config().dyncfgs());
1317 let peek_stash_read_memory_budget_bytes =
1318 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
1319 .get(catalog.system_config().dyncfgs());
1320
1321 self.implement_fast_path_peek_plan(
1322 fast_path_plan,
1323 determination.timestamp_context.timestamp_or_default(),
1324 finishing,
1325 target_cluster_id,
1326 target_replica,
1327 typ,
1328 max_result_size,
1329 max_query_result_size,
1330 row_set_finishing_seconds,
1331 read_holds,
1332 peek_stash_read_batch_size_bytes,
1333 peek_stash_read_memory_budget_bytes,
1334 session.conn_id().clone(),
1335 source_ids,
1336 watch_set,
1337 )
1338 .await?
1339 }
1340 PeekPlan::SlowPath(dataflow_plan) => {
1341 if let Some(logging_id) = &statement_logging_id {
1342 self.log_set_transient_index_id(*logging_id, dataflow_plan.id);
1343 }
1344
1345 self.call_coordinator(|tx| Command::ExecuteSlowPathPeek {
1346 dataflow_plan: Box::new(dataflow_plan),
1347 determination,
1348 finishing,
1349 compute_instance: target_cluster_id,
1350 target_replica,
1351 intermediate_result_type: typ,
1352 source_ids,
1353 conn_id: session.conn_id().clone(),
1354 max_result_size,
1355 max_query_result_size,
1356 watch_set,
1357 tx,
1358 })
1359 .await?
1360 }
1361 };
1362
1363 if let Some(determination) = determination_for_notice {
1365 let explanation = self
1366 .call_coordinator(|tx| Command::ExplainTimestamp {
1367 conn_id: session.conn_id().clone(),
1368 session_wall_time: session.pcx().wall_time,
1369 cluster_id: target_cluster_id,
1370 id_bundle: input_id_bundle.clone(),
1371 determination,
1372 tx,
1373 })
1374 .await;
1375 session.add_notice(AdapterNotice::QueryTimestamp { explanation });
1376 }
1377
1378 Ok(Some(match copy_to {
1379 None => response,
1380 Some(format) => ExecuteResponse::CopyTo {
1382 format,
1383 resp: Box::new(response),
1384 },
1385 }))
1386 }
1387 Execution::Subscribe {
1388 subscribe_plan,
1389 df_desc,
1390 df_meta,
1391 optimization_finished_at: _optimization_finished_at,
1392 } => {
1393 if df_desc.as_of.as_ref().expect("as of set") == &df_desc.until {
1394 session.add_notice(AdapterNotice::EqualSubscribeBounds {
1395 bound: *df_desc.until.as_option().expect("as of set"),
1396 });
1397 }
1398 coord::sequencer::emit_optimizer_notices(
1399 &*catalog,
1400 session,
1401 &df_meta.optimizer_notices,
1402 );
1403
1404 let response = self
1405 .call_coordinator(|tx| Command::ExecuteSubscribe {
1406 df_desc,
1407 dependency_ids: subscribe_plan.from.depends_on(),
1408 cluster_id: target_cluster_id,
1409 replica_id: target_replica,
1410 conn_id: session.conn_id().clone(),
1411 session_uuid: session.uuid(),
1412 read_holds,
1413 plan: subscribe_plan,
1414 statement_logging_id,
1415 tx,
1416 })
1417 .await?;
1418 Ok(Some(response))
1419 }
1420 Execution::CopyToS3 {
1421 global_lir_plan,
1422 source_ids,
1423 } => {
1424 let (df_desc, df_meta) = global_lir_plan.unapply();
1425
1426 coord::sequencer::emit_optimizer_notices(
1427 &*catalog,
1428 session,
1429 &df_meta.optimizer_notices,
1430 );
1431
1432 let sink_id = df_desc.sink_id();
1434 let sinks = &df_desc.sink_exports;
1435 if sinks.len() != 1 {
1436 return Err(AdapterError::Internal(
1437 "expected exactly one copy to s3 sink".into(),
1438 ));
1439 }
1440 let (_, sink_desc) = sinks
1441 .first_key_value()
1442 .expect("known to be exactly one copy to s3 sink");
1443 let s3_sink_connection = match &sink_desc.connection {
1444 mz_compute_types::sinks::ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1445 conn.clone()
1446 }
1447 _ => {
1448 return Err(AdapterError::Internal(
1449 "expected copy to s3 oneshot sink".into(),
1450 ));
1451 }
1452 };
1453
1454 self.call_coordinator(|tx| Command::CopyToPreflight {
1457 s3_sink_connection,
1458 sink_id,
1459 tx,
1460 })
1461 .await?;
1462
1463 let watch_set = statement_logging_id.map(|logging_id| {
1465 WatchSetCreation::new(
1466 logging_id,
1467 catalog.state(),
1468 &input_id_bundle,
1469 determination.timestamp_context.timestamp_or_default(),
1470 )
1471 });
1472
1473 let response = self
1474 .call_coordinator(|tx| Command::ExecuteCopyTo {
1475 df_desc: Box::new(df_desc),
1476 compute_instance: target_cluster_id,
1477 target_replica,
1478 source_ids,
1479 conn_id: session.conn_id().clone(),
1480 watch_set,
1481 tx,
1482 })
1483 .await?;
1484
1485 Ok(Some(response))
1486 }
1487 }
1488 }
1489
1490 pub(crate) async fn frontend_determine_timestamp(
1497 &mut self,
1498 session: &Session,
1499 id_bundle: &CollectionIdBundle,
1500 when: &QueryWhen,
1501 compute_instance: ComputeInstanceId,
1502 timeline_context: &TimelineContext,
1503 oracle_read_ts: Option<Timestamp>,
1504 real_time_recency_ts: Option<Timestamp>,
1505 ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> {
1506 let isolation_level = session.vars().transaction_isolation();
1509
1510 let (read_holds, upper) = self
1511 .acquire_read_holds_and_least_valid_write(id_bundle)
1512 .await
1513 .map_err(|err| {
1514 AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1515 err,
1516 compute_instance,
1517 )
1518 })?;
1519 let (det, read_holds) = <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1520 session,
1521 id_bundle,
1522 when,
1523 timeline_context,
1524 oracle_read_ts,
1525 real_time_recency_ts,
1526 isolation_level,
1527 read_holds,
1528 upper.clone(),
1529 )?;
1530
1531 session
1532 .metrics()
1533 .determine_timestamp(&[
1534 match det.respond_immediately() {
1535 true => "true",
1536 false => "false",
1537 },
1538 isolation_level.as_str(),
1539 &compute_instance.to_string(),
1540 ])
1541 .inc();
1542 if !det.respond_immediately()
1543 && isolation_level == &IsolationLevel::StrictSerializable
1544 && real_time_recency_ts.is_none()
1545 {
1546 if let Some(strict) = det.timestamp_context.timestamp() {
1548 let (serializable_det, _tmp_read_holds) =
1549 <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1550 session,
1551 id_bundle,
1552 when,
1553 timeline_context,
1554 oracle_read_ts,
1555 real_time_recency_ts,
1556 &IsolationLevel::Serializable,
1557 read_holds.clone(),
1558 upper,
1559 )?;
1560 if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
1561 session
1562 .metrics()
1563 .timestamp_difference_for_strict_serializable_ms(&[compute_instance
1564 .to_string()
1565 .as_ref()])
1566 .observe(f64::cast_lossy(u64::from(
1567 strict.saturating_sub(*serializable),
1568 )));
1569 }
1570 }
1571 }
1572
1573 Ok((det, read_holds))
1574 }
1575
1576 fn assert_read_holds_correct(
1577 read_holds: &ReadHolds,
1578 execution: &Execution,
1579 determination: &TimestampDetermination,
1580 target_cluster_id: ClusterId,
1581 in_immediate_multi_stmt_txn: bool,
1582 ) {
1583 let (source_imports, index_imports, as_of, execution_name): (
1585 Vec<GlobalId>,
1586 Vec<GlobalId>,
1587 Timestamp,
1588 &str,
1589 ) = match execution {
1590 Execution::Peek {
1591 global_lir_plan, ..
1592 } => match global_lir_plan.peek_plan() {
1593 PeekPlan::FastPath(fast_path_plan) => {
1594 let (sources, indexes) = match fast_path_plan {
1595 FastPathPlan::Constant(..) => (vec![], vec![]),
1596 FastPathPlan::PeekExisting(_coll_id, idx_id, ..) => (vec![], vec![*idx_id]),
1597 FastPathPlan::PeekPersist(global_id, ..) => (vec![*global_id], vec![]),
1598 };
1599 (
1600 sources,
1601 indexes,
1602 determination.timestamp_context.timestamp_or_default(),
1603 "FastPath",
1604 )
1605 }
1606 PeekPlan::SlowPath(dataflow_plan) => {
1607 let as_of = dataflow_plan
1608 .desc
1609 .as_of
1610 .clone()
1611 .expect("dataflow has an as_of")
1612 .into_element();
1613 (
1614 dataflow_plan.desc.source_imports.keys().cloned().collect(),
1615 dataflow_plan.desc.index_imports.keys().cloned().collect(),
1616 as_of,
1617 "SlowPath",
1618 )
1619 }
1620 },
1621 Execution::CopyToS3 {
1622 global_lir_plan, ..
1623 } => {
1624 let df_desc = global_lir_plan.df_desc();
1625 let as_of = df_desc
1626 .as_of
1627 .clone()
1628 .expect("dataflow has an as_of")
1629 .into_element();
1630 (
1631 df_desc.source_imports.keys().cloned().collect(),
1632 df_desc.index_imports.keys().cloned().collect(),
1633 as_of,
1634 "CopyToS3",
1635 )
1636 }
1637 Execution::ExplainPlan { .. } | Execution::ExplainPushdown { .. } => {
1638 return;
1640 }
1641 Execution::Subscribe { df_desc, .. } => {
1642 let as_of = df_desc
1643 .as_of
1644 .clone()
1645 .expect("dataflow has an as_of")
1646 .into_element();
1647 (
1648 df_desc.source_imports.keys().cloned().collect(),
1649 df_desc.index_imports.keys().cloned().collect(),
1650 as_of,
1651 "Subscribe",
1652 )
1653 }
1654 };
1655
1656 for id in source_imports.iter() {
1658 soft_assert_or_log!(
1659 read_holds.storage_holds.contains_key(id),
1660 "[{}] missing read hold for the source import {}; (in_immediate_multi_stmt_txn: {})",
1661 execution_name,
1662 id,
1663 in_immediate_multi_stmt_txn,
1664 );
1665 }
1666 for id in index_imports.iter() {
1667 soft_assert_or_log!(
1668 read_holds
1669 .compute_ids()
1670 .map(|(_instance, coll)| coll)
1671 .contains(id),
1672 "[{}] missing read hold for the index import {}; (in_immediate_multi_stmt_txn: {})",
1673 execution_name,
1674 id,
1675 in_immediate_multi_stmt_txn,
1676 );
1677 }
1678
1679 for (id, h) in read_holds.storage_holds.iter() {
1681 soft_assert_or_log!(
1682 h.since().less_equal(&as_of),
1683 "[{}] storage read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1684 execution_name,
1685 h.since(),
1686 id,
1687 as_of,
1688 determination,
1689 in_immediate_multi_stmt_txn,
1690 );
1691 }
1692 for ((instance, id), h) in read_holds.compute_holds.iter() {
1693 soft_assert_eq_or_log!(
1694 *instance,
1695 target_cluster_id,
1696 "[{}] the read hold on {} is on the wrong cluster; (in_immediate_multi_stmt_txn: {})",
1697 execution_name,
1698 id,
1699 in_immediate_multi_stmt_txn,
1700 );
1701 soft_assert_or_log!(
1702 h.since().less_equal(&as_of),
1703 "[{}] compute read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1704 execution_name,
1705 h.since(),
1706 id,
1707 as_of,
1708 determination,
1709 in_immediate_multi_stmt_txn,
1710 );
1711 }
1712 }
1713}
1714
1715enum Execution {
1717 Peek {
1718 global_lir_plan: optimize::peek::GlobalLirPlan,
1719 optimization_finished_at: EpochMillis,
1720 plan_insights_optimizer_trace: Option<OptimizerTrace>,
1721 finishing: RowSetFinishing,
1722 copy_to: Option<plan::CopyFormat>,
1723 insights_ctx: Option<Box<PlanInsightsContext>>,
1724 },
1725 Subscribe {
1726 subscribe_plan: SubscribePlan,
1727 df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1728 df_meta: DataflowMetainfo,
1729 optimization_finished_at: EpochMillis,
1730 },
1731 CopyToS3 {
1732 global_lir_plan: optimize::copy_to::GlobalLirPlan,
1733 source_ids: BTreeSet<GlobalId>,
1734 },
1735 ExplainPlan {
1736 df_meta: DataflowMetainfo,
1737 explain_ctx: ExplainPlanContext,
1738 optimizer: optimize::peek::Optimizer,
1739 insights_ctx: Option<Box<PlanInsightsContext>>,
1740 },
1741 ExplainPushdown {
1742 imports: BTreeMap<GlobalId, mz_expr::MapFilterProject>,
1743 determination: TimestampDetermination,
1744 },
1745}