1use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::sync::Arc;
13use std::time::Duration;
14
15use itertools::Itertools;
16use mz_adapter_types::dyncfgs::ENABLE_FRONTEND_SUBSCRIBES;
17use mz_compute_types::ComputeInstanceId;
18use mz_compute_types::dataflows::DataflowDescription;
19use mz_controller_types::ClusterId;
20use mz_expr::{CollectionPlan, ResultSpec, RowSetFinishing};
21use mz_ore::cast::{CastFrom, CastLossy};
22use mz_ore::collections::CollectionExt;
23use mz_ore::now::EpochMillis;
24use mz_ore::task::JoinHandle;
25use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
26use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
27use mz_repr::role_id::RoleId;
28use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
29use mz_sql::ast::Raw;
30use mz_sql::catalog::CatalogCluster;
31use mz_sql::plan::Params;
32use mz_sql::plan::{
33 self, Explainee, ExplaineeStatement, Plan, QueryWhen, SelectPlan, SubscribePlan,
34};
35use mz_sql::rbac;
36use mz_sql::session::metadata::SessionMetadata;
37use mz_sql::session::vars::IsolationLevel;
38use mz_sql_parser::ast::{CopyDirection, ExplainStage, ShowStatement, Statement};
39use mz_transform::EmptyStatisticsOracle;
40use mz_transform::dataflow::DataflowMetainfo;
41use opentelemetry::trace::TraceContextExt;
42use timely::progress::Antichain;
43use tracing::{Span, debug, warn};
44use tracing_opentelemetry::OpenTelemetrySpanExt;
45
46use crate::catalog::Catalog;
47use crate::command::Command;
48use crate::coord::peek::{FastPathPlan, PeekPlan};
49use crate::coord::sequencer::{eval_copy_to_uri, statistics_oracle};
50use crate::coord::timeline::timedomain_for;
51use crate::coord::timestamp_selection::TimestampDetermination;
52use crate::coord::{
53 Coordinator, CopyToContext, ExecuteContextGuard, ExplainContext, ExplainPlanContext,
54 TargetCluster,
55};
56use crate::explain::insights::PlanInsightsContext;
57use crate::explain::optimizer_trace::OptimizerTrace;
58use crate::optimize::Optimize;
59use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
60use crate::session::{Session, TransactionOps, TransactionStatus};
61use crate::statement_logging::WatchSetCreation;
62use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
63use crate::{
64 AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, PeekClient, ReadHolds,
65 TimelineContext, TimestampContext, TimestampProvider, optimize,
66};
67use crate::{coord, metrics};
68
69impl PeekClient {
70 pub(crate) async fn try_frontend_peek(
78 &mut self,
79 portal_name: &str,
80 catalog: Option<Arc<Catalog>>,
81 session: &mut Session,
82 outer_ctx_extra: &mut Option<ExecuteContextGuard>,
83 ) -> Result<Option<ExecuteResponse>, AdapterError> {
84 if session.vars().emit_trace_id_notice() {
87 let span_context = tracing::Span::current()
88 .context()
89 .span()
90 .span_context()
91 .clone();
92 if span_context.is_valid() {
93 session.add_notice(AdapterNotice::QueryTrace {
94 trace_id: span_context.trace_id(),
95 });
96 }
97 }
98
99 let catalog = match catalog {
106 Some(c) => c,
107 None => self.catalog_snapshot("try_frontend_peek").await,
108 };
109
110 let (stmt, params, logging, lifecycle_timestamps) = {
112 if let Err(err) = Coordinator::verify_portal(&*catalog, session, portal_name) {
113 outer_ctx_extra
114 .take()
115 .and_then(|guard| guard.defuse().retire());
116 return Err(err);
117 }
118 let portal = session
119 .get_portal_unverified(portal_name)
120 .expect("called verify_portal above");
123 let params = portal.parameters.clone();
124 let stmt = portal.stmt.clone();
125 let logging = Arc::clone(&portal.logging);
126 let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
127 (stmt, params, logging, lifecycle_timestamps)
128 };
129
130 if let Some(ref stmt) = stmt {
133 match &**stmt {
134 Statement::Select(_)
135 | Statement::ExplainAnalyzeObject(_)
136 | Statement::ExplainAnalyzeCluster(_)
137 | Statement::Show(ShowStatement::ShowObjects(_))
138 | Statement::Show(ShowStatement::ShowColumns(_)) => {
139 }
144 Statement::ExplainPlan(explain_stmt) => {
145 match &explain_stmt.explainee {
150 mz_sql_parser::ast::Explainee::Select(..) => {
151 }
153 _ => {
154 debug!(
155 "Bailing out from try_frontend_peek, because EXPLAIN is not for a SELECT query"
156 );
157 return Ok(None);
158 }
159 }
160 }
161 Statement::ExplainPushdown(explain_stmt) => {
162 match &explain_stmt.explainee {
164 mz_sql_parser::ast::Explainee::Select(_, false) => {}
165 _ => {
166 debug!(
167 "Bailing out from try_frontend_peek, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is for EXPLAIN BROKEN"
168 );
169 return Ok(None);
170 }
171 }
172 }
173 Statement::Copy(copy_stmt) => {
174 match ©_stmt.direction {
175 CopyDirection::To => {
176 }
178 CopyDirection::From => {
179 debug!(
180 "Bailing out from try_frontend_peek, because COPY FROM is not supported"
181 );
182 return Ok(None);
183 }
184 }
185 }
186
187 Statement::Subscribe(_)
188 if ENABLE_FRONTEND_SUBSCRIBES.get(catalog.system_config().dyncfgs()) =>
189 {
190 }
192 _ => {
193 debug!(
194 "Bailing out from try_frontend_peek, because statement type is not supported"
195 );
196 return Ok(None);
197 }
198 }
199 }
200
201 let statement_logging_id = if outer_ctx_extra.is_none() {
204 let result = self.statement_logging_frontend.begin_statement_execution(
206 session,
207 ¶ms,
208 &logging,
209 catalog.system_config(),
210 lifecycle_timestamps,
211 );
212
213 if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result {
214 self.log_began_execution(began_execution, mseh_update, prepared_statement);
215 Some(logging_id)
216 } else {
217 None
218 }
219 } else {
220 outer_ctx_extra
227 .take()
228 .and_then(|guard| guard.defuse().retire())
229 };
230
231 let result = self
232 .try_frontend_peek_inner(session, catalog, stmt, params, statement_logging_id)
233 .await;
234
235 if let Some(logging_id) = statement_logging_id {
238 let reason = match &result {
239 Ok(Some(
241 ExecuteResponse::SendingRowsStreaming { .. }
242 | ExecuteResponse::Subscribing { .. },
243 )) => {
244 return result;
247 }
248 Ok(Some(resp @ ExecuteResponse::CopyTo { resp: inner, .. })) => {
250 match inner.as_ref() {
251 ExecuteResponse::SendingRowsStreaming { .. }
252 | ExecuteResponse::Subscribing { .. } => {
253 return result;
256 }
257 _ => resp.into(),
259 }
260 }
261 Ok(None) => {
263 soft_panic_or_log!(
264 "Bailed out from `try_frontend_peek_inner` after we already logged the beginning of statement execution."
265 );
266 self.log_ended_execution(
269 logging_id,
270 StatementEndedExecutionReason::Errored {
271 error: "Internal error: bailed out from `try_frontend_peek_inner`"
272 .to_string(),
273 },
274 );
275 return result;
276 }
277 Ok(Some(resp)) => resp.into(),
282 Err(e) => StatementEndedExecutionReason::Errored {
283 error: e.to_string(),
284 },
285 };
286
287 self.log_ended_execution(logging_id, reason);
288 }
289
290 result
291 }
292
293 async fn try_frontend_peek_inner(
296 &mut self,
297 session: &mut Session,
298 catalog: Arc<Catalog>,
299 stmt: Option<Arc<Statement<Raw>>>,
300 params: Params,
301 statement_logging_id: Option<crate::statement_logging::StatementLoggingId>,
302 ) -> Result<Option<ExecuteResponse>, AdapterError> {
303 let stmt = match stmt {
304 Some(stmt) => stmt,
305 None => {
306 debug!("try_frontend_peek_inner succeeded on an empty query");
307 return Ok(Some(ExecuteResponse::EmptyQuery));
308 }
309 };
310
311 session
312 .metrics()
313 .query_total(&[
314 metrics::session_type_label_value(session.user()),
315 metrics::statement_type_label_value(&stmt),
316 ])
317 .inc();
318
319 let conn_catalog = catalog.for_session(session);
322 let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?;
325
326 let pcx = session.pcx();
327 let (plan, sql_impl_ids) =
328 mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, ¶ms, &resolved_ids)?;
329
330 enum QueryPlan<'a> {
332 Select(&'a SelectPlan),
333 CopyTo(&'a SelectPlan, CopyToContext),
334 Subscribe(&'a SubscribePlan),
335 }
336
337 let (query_plan, explain_ctx) = match &plan {
338 Plan::Select(select_plan) => {
339 let explain_ctx = if session.vars().emit_plan_insights_notice() {
340 let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
341 ExplainContext::PlanInsightsNotice(optimizer_trace)
342 } else {
343 ExplainContext::None
344 };
345 (QueryPlan::Select(select_plan), explain_ctx)
346 }
347 Plan::ShowColumns(show_columns_plan) => {
348 (
350 QueryPlan::Select(&show_columns_plan.select_plan),
351 ExplainContext::None,
352 )
353 }
354 Plan::ExplainPlan(plan::ExplainPlanPlan {
355 stage,
356 format,
357 config,
358 explainee: Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc }),
359 }) => {
360 let optimizer_trace = OptimizerTrace::new(stage.paths());
362 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
363 broken: *broken,
364 config: config.clone(),
365 format: *format,
366 stage: *stage,
367 replan: None,
368 desc: Some(desc.clone()),
369 optimizer_trace,
370 });
371 (QueryPlan::Select(plan), explain_ctx)
372 }
373 Plan::CopyTo(plan::CopyToPlan {
375 select_plan,
376 desc,
377 to,
378 connection,
379 connection_id,
380 format,
381 max_file_size,
382 }) => {
383 let uri = eval_copy_to_uri(to.clone(), session, catalog.state())?;
384
385 let copy_to_ctx = CopyToContext {
387 desc: desc.clone(),
388 uri,
389 connection: connection.clone(),
390 connection_id: *connection_id,
391 format: format.clone(),
392 max_file_size: *max_file_size,
393 output_batch_count: None,
394 };
395
396 (
397 QueryPlan::CopyTo(select_plan, copy_to_ctx),
398 ExplainContext::None,
399 )
400 }
401 Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => {
402 match explainee {
404 plan::Explainee::Statement(plan::ExplaineeStatement::Select {
405 broken: false,
406 plan,
407 desc: _,
408 }) => {
409 let explain_ctx = ExplainContext::Pushdown;
410 (QueryPlan::Select(plan), explain_ctx)
411 }
412 _ => {
413 soft_panic_or_log!(
416 "unexpected EXPLAIN FILTER PUSHDOWN plan kind in frontend peek sequencing: {:?}",
417 explainee
418 );
419 debug!(
420 "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is EXPLAIN BROKEN"
421 );
422 return Ok(None);
423 }
424 }
425 }
426 Plan::SideEffectingFunc(sef_plan) => {
427 let response = self
431 .call_coordinator(|tx| Command::ExecuteSideEffectingFunc {
432 plan: sef_plan.clone(),
433 conn_id: session.conn_id().clone(),
434 current_role: session.role_metadata().current_role,
435 tx,
436 })
437 .await?;
438 return Ok(Some(response));
439 }
440 Plan::Subscribe(subscribe) => (QueryPlan::Subscribe(subscribe), ExplainContext::None),
441 _ => {
442 soft_panic_or_log!(
445 "Unexpected plan kind in frontend peek sequencing: {:?}",
446 plan
447 );
448 debug!(
449 "Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, side-effecting SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO S3"
450 );
451 return Ok(None);
452 }
453 };
454
455 let when = match query_plan {
456 QueryPlan::Select(s) => &s.when,
457 QueryPlan::CopyTo(s, _) => &s.when,
458 QueryPlan::Subscribe(s) => &s.when,
459 };
460
461 let depends_on = match query_plan {
462 QueryPlan::Select(s) => s.source.depends_on(),
463 QueryPlan::CopyTo(s, _) => s.source.depends_on(),
464 QueryPlan::Subscribe(s) => s.from.depends_on(),
465 };
466
467 let contains_temporal = match query_plan {
468 QueryPlan::Select(s) => s.source.contains_temporal(),
469 QueryPlan::CopyTo(s, _) => s.source.contains_temporal(),
470 QueryPlan::Subscribe(s) => Ok(s.from.contains_temporal()),
471 };
472
473 assert!(plan.allowed_in_read_only());
477
478 let (cluster, target_cluster_id, target_cluster_name) = {
479 let target_cluster = match session.transaction().cluster() {
480 Some(cluster_id) => TargetCluster::Transaction(cluster_id),
482 None => coord::catalog_serving::auto_run_on_catalog_server(
484 &conn_catalog,
485 session,
486 &plan,
487 ),
488 };
489 let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
490 (cluster, cluster.id, &cluster.name)
491 };
492
493 if let Some(logging_id) = &statement_logging_id {
495 self.log_set_cluster(*logging_id, target_cluster_id);
496 }
497
498 coord::catalog_serving::check_cluster_restrictions(
499 target_cluster_name.as_str(),
500 &conn_catalog,
501 &plan,
502 )?;
503
504 rbac::check_plan(
505 &conn_catalog,
506 None::<fn(u32) -> Option<RoleId>>,
509 session,
510 &plan,
511 Some(target_cluster_id),
512 &resolved_ids,
513 &sql_impl_ids,
514 )?;
515
516 if let Some((_, wait_future)) =
517 coord::appends::waiting_on_startup_appends(&*catalog, session, &plan)
518 {
519 wait_future.await;
520 }
521
522 let max_query_result_size = Some(session.vars().max_query_result_size());
523
524 let compute_instance_snapshot =
529 ComputeInstanceSnapshot::new_without_collections(cluster.id());
530
531 let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config())
532 .override_from(&catalog.get_cluster(cluster.id()).config.features())
533 .override_from(&explain_ctx);
534
535 if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
536 return Err(AdapterError::NoClusterReplicasAvailable {
537 name: cluster.name.clone(),
538 is_managed: cluster.is_managed(),
539 });
540 }
541
542 let (_, view_id) = self.transient_id_gen.allocate_id();
543 let (_, index_id) = self.transient_id_gen.allocate_id();
544
545 let target_replica_name = session.vars().cluster_replica();
546 let mut target_replica = target_replica_name
547 .map(|name| {
548 cluster
549 .replica_id(name)
550 .ok_or(AdapterError::UnknownClusterReplica {
551 cluster_name: cluster.name.clone(),
552 replica_name: name.to_string(),
553 })
554 })
555 .transpose()?;
556
557 let source_ids = depends_on;
558 let mut timeline_context = catalog.validate_timeline_context(source_ids.iter().copied())?;
562 if matches!(timeline_context, TimelineContext::TimestampIndependent) && contains_temporal? {
563 timeline_context = TimelineContext::TimestampDependent;
567 }
568
569 let notices = coord::sequencer::check_log_reads(
570 &catalog,
571 cluster,
572 &source_ids,
573 &mut target_replica,
574 session.vars(),
575 )?;
576 session.add_notices(notices);
577
578 let isolation_level = session.vars().transaction_isolation().clone();
581 let timeline = Coordinator::get_timeline(&timeline_context);
582 let needs_linearized_read_ts =
583 Coordinator::needs_linearized_read_ts(&isolation_level, when);
584
585 let oracle_read_ts = match timeline {
586 Some(timeline) if needs_linearized_read_ts => {
587 let oracle = self.ensure_oracle(timeline).await?;
588 let oracle_read_ts = oracle.read_ts().await;
589 Some(oracle_read_ts)
590 }
591 Some(_) | None => None,
592 };
593
594 let vars = session.vars();
597 let real_time_recency_ts: Option<Timestamp> = if vars.real_time_recency()
598 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
599 && !session.contains_read_timestamp()
600 {
601 self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp {
603 source_ids: source_ids.clone(),
604 real_time_recency_timeout: *vars.real_time_recency_timeout(),
605 tx,
606 })
607 .await?
608 } else {
609 None
610 };
611
612 let dataflow_builder =
615 DataflowBuilder::new(catalog.state(), compute_instance_snapshot.clone());
616 let input_id_bundle = dataflow_builder.sufficient_collections(source_ids.clone());
617
618 let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when)
631 && !matches!(query_plan, QueryPlan::Subscribe { .. });
632
633 let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
635 Some(
638 determination @ TimestampDetermination {
639 timestamp_context: TimestampContext::TimelineTimestamp { .. },
640 ..
641 },
642 ) if in_immediate_multi_stmt_txn => {
643 let txn_read_holds_opt = self
651 .call_coordinator(|tx| Command::GetTransactionReadHoldsBundle {
652 conn_id: session.conn_id().clone(),
653 tx,
654 })
655 .await;
656
657 if let Some(txn_read_holds) = txn_read_holds_opt {
658 let allowed_id_bundle = txn_read_holds.id_bundle();
659 let outside = input_id_bundle.difference(&allowed_id_bundle);
660
661 if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
663 let valid_names =
664 allowed_id_bundle.resolve_names(&*catalog, session.conn_id());
665 let invalid_names = outside.resolve_names(&*catalog, session.conn_id());
666 return Err(AdapterError::RelationOutsideTimeDomain {
667 relations: invalid_names,
668 names: valid_names,
669 });
670 }
671
672 let read_holds = txn_read_holds.subset(&input_id_bundle);
674
675 (determination, read_holds)
676 } else {
677 return Err(AdapterError::Internal(
682 "Missing transaction read holds for multi-statement transaction"
683 .to_string(),
684 ));
685 }
686 }
687 _ => {
688 let timedomain_bundle;
695 let determine_bundle = if in_immediate_multi_stmt_txn {
696 timedomain_bundle = timedomain_for(
700 &*catalog,
701 &dataflow_builder,
702 &source_ids,
703 &timeline_context,
704 session.conn_id(),
705 target_cluster_id,
706 )?;
707 &timedomain_bundle
708 } else {
709 &input_id_bundle
711 };
712 let (determination, read_holds) = self
713 .frontend_determine_timestamp(
714 session,
715 determine_bundle,
716 when,
717 target_cluster_id,
718 &timeline_context,
719 oracle_read_ts,
720 real_time_recency_ts,
721 )
722 .await?;
723
724 if in_immediate_multi_stmt_txn {
728 self.call_coordinator(|tx| Command::StoreTransactionReadHolds {
729 conn_id: session.conn_id().clone(),
730 read_holds: read_holds.clone(),
731 tx,
732 })
733 .await;
734 }
735
736 (determination, read_holds)
737 }
738 };
739
740 {
741 for id in input_id_bundle.iter() {
743 let s = read_holds.storage_holds.contains_key(&id);
744 let c = read_holds
745 .compute_ids()
746 .map(|(_instance, coll)| coll)
747 .contains(&id);
748 soft_assert_or_log!(
749 s || c,
750 "missing read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
751 id,
752 in_immediate_multi_stmt_txn,
753 );
754 }
755
756 for id in input_id_bundle.storage_ids.iter() {
759 soft_assert_or_log!(
760 read_holds.storage_holds.contains_key(id),
761 "missing storage read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
762 id,
763 in_immediate_multi_stmt_txn,
764 );
765 }
766 for id in input_id_bundle
767 .compute_ids
768 .iter()
769 .flat_map(|(_instance, colls)| colls)
770 {
771 soft_assert_or_log!(
772 read_holds
773 .compute_ids()
774 .map(|(_instance, coll)| coll)
775 .contains(id),
776 "missing compute read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
777 id,
778 in_immediate_multi_stmt_txn,
779 );
780 }
781 }
782
783 let requires_linearization = (&explain_ctx).into();
796 let mut transaction_determination = determination.clone();
797 match query_plan {
798 QueryPlan::Subscribe { .. } => {
799 if when.is_transactional() {
800 session.add_transaction_ops(TransactionOps::Subscribe)?;
801 }
802 }
803 QueryPlan::Select(..) | QueryPlan::CopyTo(..) => {
804 if when.is_transactional() {
805 session.add_transaction_ops(TransactionOps::Peeks {
806 determination: transaction_determination,
807 cluster_id: target_cluster_id,
808 requires_linearization,
809 })?;
810 } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
811 transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
813 session.add_transaction_ops(TransactionOps::Peeks {
814 determination: transaction_determination,
815 cluster_id: target_cluster_id,
816 requires_linearization,
817 })?;
818 }
819 }
820 }
821
822 let stats = statistics_oracle(
825 session,
826 &source_ids,
827 &determination.timestamp_context.antichain(),
828 true,
829 catalog.system_config(),
830 &*self.storage_collections,
831 )
832 .await
833 .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
834
835 let timestamp_context = determination.timestamp_context.clone();
838 let session_meta = session.meta();
839 let now = catalog.config().now.clone();
840 let target_cluster_name = target_cluster_name.clone();
841 let needs_plan_insights = explain_ctx.needs_plan_insights();
842 let determination_for_pushdown = if matches!(explain_ctx, ExplainContext::Pushdown) {
843 Some(determination.clone())
846 } else {
847 None
848 };
849
850 let span = Span::current();
851
852 let catalog_for_insights = if needs_plan_insights {
854 Some(Arc::clone(&catalog))
855 } else {
856 None
857 };
858 let mut compute_instances = BTreeMap::new();
859 if needs_plan_insights {
860 for user_cluster in catalog.user_clusters() {
861 let snapshot = ComputeInstanceSnapshot::new_without_collections(user_cluster.id);
862 compute_instances.insert(user_cluster.name.clone(), snapshot);
863 }
864 }
865
866 let source_ids_for_closure = source_ids.clone();
867
868 let optimization_future: JoinHandle<Result<_, AdapterError>> = match query_plan {
869 QueryPlan::CopyTo(select_plan, mut copy_to_ctx) => {
870 let raw_expr = select_plan.source.clone();
871
872 let worker_counts = cluster.replicas().map(|r| {
874 let loc = &r.config.location;
875 loc.workers().unwrap_or_else(|| loc.num_processes())
876 });
877 let max_worker_count = match worker_counts.max() {
878 Some(count) => u64::cast_from(count),
879 None => {
880 return Err(AdapterError::NoClusterReplicasAvailable {
881 name: cluster.name.clone(),
882 is_managed: cluster.is_managed(),
883 });
884 }
885 };
886 copy_to_ctx.output_batch_count = Some(max_worker_count);
887
888 let mut optimizer = optimize::copy_to::Optimizer::new(
889 Arc::clone(&catalog),
890 compute_instance_snapshot,
891 view_id,
892 copy_to_ctx,
893 optimizer_config,
894 self.optimizer_metrics.clone(),
895 );
896
897 mz_ore::task::spawn_blocking(
898 || "optimize copy-to",
899 move || {
900 span.in_scope(|| {
901 let _dispatch_guard = explain_ctx.dispatch_guard();
902
903 let local_mir_plan =
906 optimizer.catch_unwind_optimize(raw_expr.clone())?;
907 let local_mir_plan = local_mir_plan.resolve(
909 timestamp_context.clone(),
910 &session_meta,
911 stats,
912 );
913 let global_lir_plan =
915 optimizer.catch_unwind_optimize(local_mir_plan)?;
916 Ok(Execution::CopyToS3 {
917 global_lir_plan,
918 source_ids: source_ids_for_closure,
919 })
920 })
921 },
922 )
923 }
924 QueryPlan::Select(select_plan) => {
925 let select_plan = select_plan.clone();
926 let raw_expr = select_plan.source.clone();
927
928 let mut optimizer = optimize::peek::Optimizer::new(
930 Arc::clone(&catalog),
931 compute_instance_snapshot,
932 select_plan.finishing.clone(),
933 view_id,
934 index_id,
935 optimizer_config,
936 self.optimizer_metrics.clone(),
937 );
938
939 mz_ore::task::spawn_blocking(
940 || "optimize peek",
941 move || {
942 span.in_scope(|| {
943 let _dispatch_guard = explain_ctx.dispatch_guard();
944
945 let pipeline = || {
952 let local_mir_plan =
953 optimizer.catch_unwind_optimize(raw_expr.clone())?;
954 let local_mir_plan = local_mir_plan.resolve(
956 timestamp_context.clone(),
957 &session_meta,
958 stats,
959 );
960 let global_lir_plan =
962 optimizer.catch_unwind_optimize(local_mir_plan)?;
963 Ok::<_, AdapterError>(global_lir_plan)
964 };
965
966 let global_lir_plan_result = pipeline();
967 let optimization_finished_at = now();
968
969 let create_insights_ctx =
970 |optimizer: &optimize::peek::Optimizer,
971 is_notice: bool|
972 -> Option<Box<PlanInsightsContext>> {
973 if !needs_plan_insights {
974 return None;
975 }
976
977 let catalog = catalog_for_insights.as_ref()?;
978
979 let enable_re_optimize = if needs_plan_insights {
980 let dyncfgs = catalog.system_config().dyncfgs();
988 let opt_limit = mz_adapter_types::dyncfgs
989 ::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
990 .get(dyncfgs);
991 !(is_notice && optimizer.duration() > opt_limit)
992 } else {
993 false
994 };
995
996 Some(Box::new(PlanInsightsContext {
997 stmt: select_plan
998 .select
999 .as_deref()
1000 .map(Clone::clone)
1001 .map(Statement::Select),
1002 raw_expr: raw_expr.clone(),
1003 catalog: Arc::clone(catalog),
1004 compute_instances,
1005 target_instance: target_cluster_name,
1006 metrics: optimizer.metrics().clone(),
1007 finishing: optimizer.finishing().clone(),
1008 optimizer_config: optimizer.config().clone(),
1009 session: session_meta,
1010 timestamp_context,
1011 view_id: optimizer.select_id(),
1012 index_id: optimizer.index_id(),
1013 enable_re_optimize,
1014 }))
1015 };
1016
1017 let global_lir_plan = match global_lir_plan_result {
1018 Ok(plan) => plan,
1019 Err(err) => {
1020 let result = if let ExplainContext::Plan(explain_ctx) =
1021 explain_ctx
1022 && explain_ctx.broken
1023 {
1024 tracing::error!(
1026 "error while handling EXPLAIN statement: {}",
1027 err
1028 );
1029 Ok(Execution::ExplainPlan {
1030 df_meta: Default::default(),
1031 explain_ctx,
1032 optimizer,
1033 insights_ctx: None,
1034 })
1035 } else {
1036 Err(err)
1037 };
1038 return result;
1039 }
1040 };
1041
1042 match explain_ctx {
1043 ExplainContext::Plan(explain_ctx) => {
1044 let (_, df_meta, _) = global_lir_plan.unapply();
1045 let insights_ctx = create_insights_ctx(&optimizer, false);
1046 Ok(Execution::ExplainPlan {
1047 df_meta,
1048 explain_ctx,
1049 optimizer,
1050 insights_ctx,
1051 })
1052 }
1053 ExplainContext::None => Ok(Execution::Peek {
1054 global_lir_plan,
1055 optimization_finished_at,
1056 plan_insights_optimizer_trace: None,
1057 finishing: select_plan.finishing,
1058 copy_to: select_plan.copy_to,
1059 insights_ctx: None,
1060 }),
1061 ExplainContext::PlanInsightsNotice(optimizer_trace) => {
1062 let insights_ctx = create_insights_ctx(&optimizer, true);
1063 Ok(Execution::Peek {
1064 global_lir_plan,
1065 optimization_finished_at,
1066 plan_insights_optimizer_trace: Some(optimizer_trace),
1067 finishing: select_plan.finishing,
1068 copy_to: select_plan.copy_to,
1069 insights_ctx,
1070 })
1071 }
1072 ExplainContext::Pushdown => {
1073 let (plan, _, _) = global_lir_plan.unapply();
1074 let imports = match plan {
1075 PeekPlan::SlowPath(plan) => plan
1076 .desc
1077 .source_imports
1078 .into_iter()
1079 .filter_map(|(id, import)| {
1080 import.desc.arguments.operators.map(|mfp| (id, mfp))
1081 })
1082 .collect(),
1083 PeekPlan::FastPath(_) => {
1084 std::collections::BTreeMap::default()
1085 }
1086 };
1087 Ok(Execution::ExplainPushdown {
1088 imports,
1089 determination: determination_for_pushdown
1090 .expect("it's present for the ExplainPushdown case"),
1091 })
1092 }
1093 }
1094 })
1095 },
1096 )
1097 }
1098 QueryPlan::Subscribe(plan) => {
1099 let plan = plan.clone();
1100 let catalog: Arc<Catalog> = Arc::clone(&catalog);
1101 let debug_name = format!("subscribe-{}", index_id);
1102 let mut optimizer = optimize::subscribe::Optimizer::new(
1103 catalog,
1104 compute_instance_snapshot.clone(),
1105 view_id,
1106 index_id,
1107 plan.with_snapshot,
1108 plan.up_to,
1109 debug_name,
1110 optimizer_config,
1111 self.optimizer_metrics.clone(),
1112 );
1113 mz_ore::task::spawn_blocking(
1114 || "optimize subscribe",
1115 move || {
1116 span.in_scope(|| {
1117 let _dispatch_guard = explain_ctx.dispatch_guard();
1118
1119 let global_mir_plan = optimizer.catch_unwind_optimize(plan.clone())?;
1120 let as_of = timestamp_context.timestamp_or_default();
1121
1122 if let Some(up_to) = optimizer.up_to() {
1123 if as_of > up_to {
1124 return Err(AdapterError::AbsurdSubscribeBounds {
1125 as_of,
1126 up_to,
1127 });
1128 }
1129 }
1130 let local_mir_plan =
1131 global_mir_plan.resolve(Antichain::from_elem(as_of));
1132
1133 let global_lir_plan =
1134 optimizer.catch_unwind_optimize(local_mir_plan)?;
1135 let optimization_finished_at = now();
1136
1137 let (df_desc, df_meta) = global_lir_plan.unapply();
1138 Ok(Execution::Subscribe {
1139 subscribe_plan: plan,
1140 df_desc,
1141 df_meta,
1142 optimization_finished_at,
1143 })
1144 })
1145 },
1146 )
1147 }
1148 };
1149
1150 let mut optimization_timeout = *session.vars().statement_timeout();
1151 if optimization_timeout == Duration::ZERO {
1153 optimization_timeout = Duration::MAX;
1154 }
1155 let optimization_result =
1156 match tokio::time::timeout(optimization_timeout, optimization_future).await {
1161 Ok(Ok(result)) => result,
1162 Ok(Err(AdapterError::Optimizer(err))) => {
1163 return Err(AdapterError::Internal(format!(
1164 "internal error in optimizer: {}",
1165 err
1166 )));
1167 }
1168 Ok(Err(err)) => {
1169 return Err(err);
1170 }
1171 Err(_elapsed) => {
1172 warn!("optimize peek timed out after {:?}", optimization_timeout);
1173 return Err(AdapterError::StatementTimeout);
1174 }
1175 };
1176
1177 if let Some(logging_id) = &statement_logging_id {
1179 self.log_lifecycle_event(*logging_id, StatementLifecycleEvent::OptimizationFinished);
1180 }
1181
1182 Self::assert_read_holds_correct(
1184 &read_holds,
1185 &optimization_result,
1186 &determination,
1187 target_cluster_id,
1188 in_immediate_multi_stmt_txn,
1189 );
1190
1191 match optimization_result {
1193 Execution::ExplainPlan {
1194 df_meta,
1195 explain_ctx,
1196 optimizer,
1197 insights_ctx,
1198 } => {
1199 let rows = coord::sequencer::explain_plan_inner(
1200 session,
1201 &catalog,
1202 df_meta,
1203 explain_ctx,
1204 optimizer,
1205 insights_ctx,
1206 )
1207 .await?;
1208
1209 Ok(Some(ExecuteResponse::SendingRowsImmediate {
1210 rows: Box::new(rows.into_row_iter()),
1211 }))
1212 }
1213 Execution::ExplainPushdown {
1214 imports,
1215 determination,
1216 } => {
1217 let as_of = determination.timestamp_context.antichain();
1220 let mz_now = determination
1221 .timestamp_context
1222 .timestamp()
1223 .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1224 .unwrap_or_else(ResultSpec::value_all);
1225
1226 Ok(Some(
1227 coord::sequencer::explain_pushdown_future_inner(
1228 session,
1229 &*catalog,
1230 &self.storage_collections,
1231 as_of,
1232 mz_now,
1233 imports,
1234 )
1235 .await
1236 .await?,
1237 ))
1238 }
1239 Execution::Peek {
1240 global_lir_plan,
1241 optimization_finished_at: _optimization_finished_at,
1242 plan_insights_optimizer_trace,
1243 finishing,
1244 copy_to,
1245 insights_ctx,
1246 } => {
1247 let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
1252
1253 coord::sequencer::emit_optimizer_notices(
1254 &*catalog,
1255 session,
1256 &df_meta.optimizer_notices,
1257 );
1258
1259 if let Some(trace) = plan_insights_optimizer_trace {
1261 let target_cluster = catalog.get_cluster(target_cluster_id);
1262 let features = OptimizerFeatures::from(catalog.system_config())
1263 .override_from(&target_cluster.config.features());
1264 let insights = trace
1265 .into_plan_insights(
1266 &features,
1267 &catalog.for_session(session),
1268 Some(finishing.clone()),
1269 Some(target_cluster),
1270 df_meta.clone(),
1271 insights_ctx,
1272 )
1273 .await?;
1274 session.add_notice(AdapterNotice::PlanInsights(insights));
1275 }
1276
1277 let watch_set = statement_logging_id.map(|logging_id| {
1280 WatchSetCreation::new(
1281 logging_id,
1282 catalog.state(),
1283 &input_id_bundle,
1284 determination.timestamp_context.timestamp_or_default(),
1285 )
1286 });
1287
1288 let max_result_size = catalog.system_config().max_result_size();
1289
1290 let determination_for_notice = if session.vars().emit_timestamp_notice() {
1293 Some(determination.clone())
1294 } else {
1295 None
1296 };
1297
1298 let response = match peek_plan {
1299 PeekPlan::FastPath(fast_path_plan) => {
1300 if let Some(logging_id) = &statement_logging_id {
1301 if !matches!(fast_path_plan, FastPathPlan::Constant(..)) {
1315 self.log_set_timestamp(
1316 *logging_id,
1317 determination.timestamp_context.timestamp_or_default(),
1318 );
1319 }
1320 }
1321
1322 let row_set_finishing_seconds =
1323 session.metrics().row_set_finishing_seconds().clone();
1324
1325 let peek_stash_read_batch_size_bytes =
1326 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
1327 .get(catalog.system_config().dyncfgs());
1328 let peek_stash_read_memory_budget_bytes =
1329 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
1330 .get(catalog.system_config().dyncfgs());
1331
1332 self.implement_fast_path_peek_plan(
1333 fast_path_plan,
1334 determination.timestamp_context.timestamp_or_default(),
1335 finishing,
1336 target_cluster_id,
1337 target_replica,
1338 typ,
1339 max_result_size,
1340 max_query_result_size,
1341 row_set_finishing_seconds,
1342 read_holds,
1343 peek_stash_read_batch_size_bytes,
1344 peek_stash_read_memory_budget_bytes,
1345 session.conn_id().clone(),
1346 source_ids,
1347 watch_set,
1348 )
1349 .await?
1350 }
1351 PeekPlan::SlowPath(dataflow_plan) => {
1352 if let Some(logging_id) = &statement_logging_id {
1353 self.log_set_transient_index_id(*logging_id, dataflow_plan.id);
1354 }
1355
1356 self.call_coordinator(|tx| Command::ExecuteSlowPathPeek {
1357 dataflow_plan: Box::new(dataflow_plan),
1358 determination,
1359 finishing,
1360 compute_instance: target_cluster_id,
1361 target_replica,
1362 intermediate_result_type: typ,
1363 source_ids,
1364 conn_id: session.conn_id().clone(),
1365 max_result_size,
1366 max_query_result_size,
1367 watch_set,
1368 tx,
1369 })
1370 .await?
1371 }
1372 };
1373
1374 if let Some(determination) = determination_for_notice {
1376 let explanation = self
1377 .call_coordinator(|tx| Command::ExplainTimestamp {
1378 conn_id: session.conn_id().clone(),
1379 session_wall_time: session.pcx().wall_time,
1380 cluster_id: target_cluster_id,
1381 id_bundle: input_id_bundle.clone(),
1382 determination,
1383 tx,
1384 })
1385 .await;
1386 session.add_notice(AdapterNotice::QueryTimestamp { explanation });
1387 }
1388
1389 Ok(Some(match copy_to {
1390 None => response,
1391 Some(format) => ExecuteResponse::CopyTo {
1393 format,
1394 resp: Box::new(response),
1395 },
1396 }))
1397 }
1398 Execution::Subscribe {
1399 subscribe_plan,
1400 df_desc,
1401 df_meta,
1402 optimization_finished_at: _optimization_finished_at,
1403 } => {
1404 if df_desc.as_of.as_ref().expect("as of set") == &df_desc.until {
1405 session.add_notice(AdapterNotice::EqualSubscribeBounds {
1406 bound: *df_desc.until.as_option().expect("as of set"),
1407 });
1408 }
1409 coord::sequencer::emit_optimizer_notices(
1410 &*catalog,
1411 session,
1412 &df_meta.optimizer_notices,
1413 );
1414
1415 let response = self
1416 .call_coordinator(|tx| Command::ExecuteSubscribe {
1417 df_desc,
1418 dependency_ids: subscribe_plan.from.depends_on(),
1419 cluster_id: target_cluster_id,
1420 replica_id: target_replica,
1421 conn_id: session.conn_id().clone(),
1422 session_uuid: session.uuid(),
1423 read_holds,
1424 plan: subscribe_plan,
1425 statement_logging_id,
1426 tx,
1427 })
1428 .await?;
1429 Ok(Some(response))
1430 }
1431 Execution::CopyToS3 {
1432 global_lir_plan,
1433 source_ids,
1434 } => {
1435 let (df_desc, df_meta) = global_lir_plan.unapply();
1436
1437 coord::sequencer::emit_optimizer_notices(
1438 &*catalog,
1439 session,
1440 &df_meta.optimizer_notices,
1441 );
1442
1443 let sink_id = df_desc.sink_id();
1445 let sinks = &df_desc.sink_exports;
1446 if sinks.len() != 1 {
1447 return Err(AdapterError::Internal(
1448 "expected exactly one copy to s3 sink".into(),
1449 ));
1450 }
1451 let (_, sink_desc) = sinks
1452 .first_key_value()
1453 .expect("known to be exactly one copy to s3 sink");
1454 let s3_sink_connection = match &sink_desc.connection {
1455 mz_compute_types::sinks::ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1456 conn.clone()
1457 }
1458 _ => {
1459 return Err(AdapterError::Internal(
1460 "expected copy to s3 oneshot sink".into(),
1461 ));
1462 }
1463 };
1464
1465 self.call_coordinator(|tx| Command::CopyToPreflight {
1468 s3_sink_connection,
1469 sink_id,
1470 tx,
1471 })
1472 .await?;
1473
1474 let watch_set = statement_logging_id.map(|logging_id| {
1476 WatchSetCreation::new(
1477 logging_id,
1478 catalog.state(),
1479 &input_id_bundle,
1480 determination.timestamp_context.timestamp_or_default(),
1481 )
1482 });
1483
1484 let response = self
1485 .call_coordinator(|tx| Command::ExecuteCopyTo {
1486 df_desc: Box::new(df_desc),
1487 compute_instance: target_cluster_id,
1488 target_replica,
1489 source_ids,
1490 conn_id: session.conn_id().clone(),
1491 watch_set,
1492 tx,
1493 })
1494 .await?;
1495
1496 Ok(Some(response))
1497 }
1498 }
1499 }
1500
1501 pub(crate) async fn frontend_determine_timestamp(
1508 &mut self,
1509 session: &Session,
1510 id_bundle: &CollectionIdBundle,
1511 when: &QueryWhen,
1512 compute_instance: ComputeInstanceId,
1513 timeline_context: &TimelineContext,
1514 oracle_read_ts: Option<Timestamp>,
1515 real_time_recency_ts: Option<Timestamp>,
1516 ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> {
1517 let isolation_level = session.vars().transaction_isolation();
1520
1521 let (read_holds, upper) = self
1522 .acquire_read_holds_and_least_valid_write(id_bundle)
1523 .await
1524 .map_err(|err| {
1525 AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1526 err,
1527 compute_instance,
1528 )
1529 })?;
1530 let (det, read_holds) = <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1531 session,
1532 id_bundle,
1533 when,
1534 timeline_context,
1535 oracle_read_ts,
1536 real_time_recency_ts,
1537 isolation_level,
1538 read_holds,
1539 upper.clone(),
1540 )?;
1541
1542 session
1543 .metrics()
1544 .determine_timestamp(&[
1545 match det.respond_immediately() {
1546 true => "true",
1547 false => "false",
1548 },
1549 isolation_level.as_str(),
1550 &compute_instance.to_string(),
1551 ])
1552 .inc();
1553 if !det.respond_immediately()
1554 && isolation_level == &IsolationLevel::StrictSerializable
1555 && real_time_recency_ts.is_none()
1556 {
1557 if let Some(strict) = det.timestamp_context.timestamp() {
1559 let (serializable_det, _tmp_read_holds) =
1560 <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1561 session,
1562 id_bundle,
1563 when,
1564 timeline_context,
1565 oracle_read_ts,
1566 real_time_recency_ts,
1567 &IsolationLevel::Serializable,
1568 read_holds.clone(),
1569 upper,
1570 )?;
1571 if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
1572 session
1573 .metrics()
1574 .timestamp_difference_for_strict_serializable_ms(&[compute_instance
1575 .to_string()
1576 .as_ref()])
1577 .observe(f64::cast_lossy(u64::from(
1578 strict.saturating_sub(*serializable),
1579 )));
1580 }
1581 }
1582 }
1583
1584 Ok((det, read_holds))
1585 }
1586
1587 fn assert_read_holds_correct(
1588 read_holds: &ReadHolds,
1589 execution: &Execution,
1590 determination: &TimestampDetermination,
1591 target_cluster_id: ClusterId,
1592 in_immediate_multi_stmt_txn: bool,
1593 ) {
1594 let (source_imports, index_imports, as_of, execution_name): (
1596 Vec<GlobalId>,
1597 Vec<GlobalId>,
1598 Timestamp,
1599 &str,
1600 ) = match execution {
1601 Execution::Peek {
1602 global_lir_plan, ..
1603 } => match global_lir_plan.peek_plan() {
1604 PeekPlan::FastPath(fast_path_plan) => {
1605 let (sources, indexes) = match fast_path_plan {
1606 FastPathPlan::Constant(..) => (vec![], vec![]),
1607 FastPathPlan::PeekExisting(_coll_id, idx_id, ..) => (vec![], vec![*idx_id]),
1608 FastPathPlan::PeekPersist(global_id, ..) => (vec![*global_id], vec![]),
1609 };
1610 (
1611 sources,
1612 indexes,
1613 determination.timestamp_context.timestamp_or_default(),
1614 "FastPath",
1615 )
1616 }
1617 PeekPlan::SlowPath(dataflow_plan) => {
1618 let as_of = dataflow_plan
1619 .desc
1620 .as_of
1621 .clone()
1622 .expect("dataflow has an as_of")
1623 .into_element();
1624 (
1625 dataflow_plan.desc.source_imports.keys().cloned().collect(),
1626 dataflow_plan.desc.index_imports.keys().cloned().collect(),
1627 as_of,
1628 "SlowPath",
1629 )
1630 }
1631 },
1632 Execution::CopyToS3 {
1633 global_lir_plan, ..
1634 } => {
1635 let df_desc = global_lir_plan.df_desc();
1636 let as_of = df_desc
1637 .as_of
1638 .clone()
1639 .expect("dataflow has an as_of")
1640 .into_element();
1641 (
1642 df_desc.source_imports.keys().cloned().collect(),
1643 df_desc.index_imports.keys().cloned().collect(),
1644 as_of,
1645 "CopyToS3",
1646 )
1647 }
1648 Execution::ExplainPlan { .. } | Execution::ExplainPushdown { .. } => {
1649 return;
1651 }
1652 Execution::Subscribe { df_desc, .. } => {
1653 let as_of = df_desc
1654 .as_of
1655 .clone()
1656 .expect("dataflow has an as_of")
1657 .into_element();
1658 (
1659 df_desc.source_imports.keys().cloned().collect(),
1660 df_desc.index_imports.keys().cloned().collect(),
1661 as_of,
1662 "Subscribe",
1663 )
1664 }
1665 };
1666
1667 for id in source_imports.iter() {
1669 soft_assert_or_log!(
1670 read_holds.storage_holds.contains_key(id),
1671 "[{}] missing read hold for the source import {}; (in_immediate_multi_stmt_txn: {})",
1672 execution_name,
1673 id,
1674 in_immediate_multi_stmt_txn,
1675 );
1676 }
1677 for id in index_imports.iter() {
1678 soft_assert_or_log!(
1679 read_holds
1680 .compute_ids()
1681 .map(|(_instance, coll)| coll)
1682 .contains(id),
1683 "[{}] missing read hold for the index import {}; (in_immediate_multi_stmt_txn: {})",
1684 execution_name,
1685 id,
1686 in_immediate_multi_stmt_txn,
1687 );
1688 }
1689
1690 for (id, h) in read_holds.storage_holds.iter() {
1692 soft_assert_or_log!(
1693 h.since().less_equal(&as_of),
1694 "[{}] storage read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1695 execution_name,
1696 h.since(),
1697 id,
1698 as_of,
1699 determination,
1700 in_immediate_multi_stmt_txn,
1701 );
1702 }
1703 for ((instance, id), h) in read_holds.compute_holds.iter() {
1704 soft_assert_eq_or_log!(
1705 *instance,
1706 target_cluster_id,
1707 "[{}] the read hold on {} is on the wrong cluster; (in_immediate_multi_stmt_txn: {})",
1708 execution_name,
1709 id,
1710 in_immediate_multi_stmt_txn,
1711 );
1712 soft_assert_or_log!(
1713 h.since().less_equal(&as_of),
1714 "[{}] compute read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1715 execution_name,
1716 h.since(),
1717 id,
1718 as_of,
1719 determination,
1720 in_immediate_multi_stmt_txn,
1721 );
1722 }
1723 }
1724}
1725
1726enum Execution {
1728 Peek {
1729 global_lir_plan: optimize::peek::GlobalLirPlan,
1730 optimization_finished_at: EpochMillis,
1731 plan_insights_optimizer_trace: Option<OptimizerTrace>,
1732 finishing: RowSetFinishing,
1733 copy_to: Option<plan::CopyFormat>,
1734 insights_ctx: Option<Box<PlanInsightsContext>>,
1735 },
1736 Subscribe {
1737 subscribe_plan: SubscribePlan,
1738 df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1739 df_meta: DataflowMetainfo,
1740 optimization_finished_at: EpochMillis,
1741 },
1742 CopyToS3 {
1743 global_lir_plan: optimize::copy_to::GlobalLirPlan,
1744 source_ids: BTreeSet<GlobalId>,
1745 },
1746 ExplainPlan {
1747 df_meta: DataflowMetainfo,
1748 explain_ctx: ExplainPlanContext,
1749 optimizer: optimize::peek::Optimizer,
1750 insights_ctx: Option<Box<PlanInsightsContext>>,
1751 },
1752 ExplainPushdown {
1753 imports: BTreeMap<GlobalId, mz_expr::MapFilterProject>,
1754 determination: TimestampDetermination,
1755 },
1756}