1use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::sync::Arc;
13
14use itertools::Itertools;
15use mz_adapter_types::dyncfgs::ENABLE_FRONTEND_SUBSCRIBES;
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::dataflows::DataflowDescription;
18use mz_controller_types::ClusterId;
19use mz_expr::{CollectionPlan, ResultSpec, RowSetFinishing};
20use mz_ore::cast::{CastFrom, CastLossy};
21use mz_ore::collections::CollectionExt;
22use mz_ore::now::EpochMillis;
23use mz_ore::task::JoinHandle;
24use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
25use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
26use mz_repr::role_id::RoleId;
27use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
28use mz_sql::ast::Raw;
29use mz_sql::catalog::CatalogCluster;
30use mz_sql::plan::Params;
31use mz_sql::plan::{
32 self, Explainee, ExplaineeStatement, Plan, QueryWhen, SelectPlan, SubscribePlan,
33};
34use mz_sql::rbac;
35use mz_sql::session::metadata::SessionMetadata;
36use mz_sql::session::vars::IsolationLevel;
37use mz_sql_parser::ast::{CopyDirection, ExplainStage, ShowStatement, Statement};
38use mz_transform::EmptyStatisticsOracle;
39use mz_transform::dataflow::DataflowMetainfo;
40use opentelemetry::trace::TraceContextExt;
41use timely::progress::Antichain;
42use tracing::{Span, debug, warn};
43use tracing_opentelemetry::OpenTelemetrySpanExt;
44
45use crate::catalog::Catalog;
46use crate::command::Command;
47use crate::coord::peek::{FastPathPlan, PeekPlan};
48use crate::coord::sequencer::{eval_copy_to_uri, statistics_oracle};
49use crate::coord::timeline::timedomain_for;
50use crate::coord::timestamp_selection::TimestampDetermination;
51use crate::coord::{
52 Coordinator, CopyToContext, ExecuteContextGuard, ExplainContext, ExplainPlanContext,
53 TargetCluster,
54};
55use crate::explain::insights::PlanInsightsContext;
56use crate::explain::optimizer_trace::OptimizerTrace;
57use crate::optimize::Optimize;
58use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
59use crate::session::{Session, TransactionOps, TransactionStatus};
60use crate::statement_logging::WatchSetCreation;
61use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
62use crate::{
63 AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, PeekClient, ReadHolds,
64 TimelineContext, TimestampContext, TimestampProvider, optimize,
65};
66use crate::{coord, metrics};
67
68impl PeekClient {
69 pub(crate) async fn try_frontend_peek(
77 &mut self,
78 portal_name: &str,
79 session: &mut Session,
80 outer_ctx_extra: &mut Option<ExecuteContextGuard>,
81 ) -> Result<Option<ExecuteResponse>, AdapterError> {
82 if session.vars().emit_trace_id_notice() {
85 let span_context = tracing::Span::current()
86 .context()
87 .span()
88 .span_context()
89 .clone();
90 if span_context.is_valid() {
91 session.add_notice(AdapterNotice::QueryTrace {
92 trace_id: span_context.trace_id(),
93 });
94 }
95 }
96
97 let catalog = self.catalog_snapshot("try_frontend_peek").await;
104
105 let (stmt, params, logging, lifecycle_timestamps) = {
107 if let Err(err) = Coordinator::verify_portal(&*catalog, session, portal_name) {
108 outer_ctx_extra
109 .take()
110 .and_then(|guard| guard.defuse().retire());
111 return Err(err);
112 }
113 let portal = session
114 .get_portal_unverified(portal_name)
115 .expect("called verify_portal above");
118 let params = portal.parameters.clone();
119 let stmt = portal.stmt.clone();
120 let logging = Arc::clone(&portal.logging);
121 let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
122 (stmt, params, logging, lifecycle_timestamps)
123 };
124
125 if let Some(ref stmt) = stmt {
128 match &**stmt {
129 Statement::Select(_)
130 | Statement::ExplainAnalyzeObject(_)
131 | Statement::ExplainAnalyzeCluster(_)
132 | Statement::Show(ShowStatement::ShowObjects(_))
133 | Statement::Show(ShowStatement::ShowColumns(_)) => {
134 }
139 Statement::ExplainPlan(explain_stmt) => {
140 match &explain_stmt.explainee {
145 mz_sql_parser::ast::Explainee::Select(..) => {
146 }
148 _ => {
149 debug!(
150 "Bailing out from try_frontend_peek, because EXPLAIN is not for a SELECT query"
151 );
152 return Ok(None);
153 }
154 }
155 }
156 Statement::ExplainPushdown(explain_stmt) => {
157 match &explain_stmt.explainee {
159 mz_sql_parser::ast::Explainee::Select(_, false) => {}
160 _ => {
161 debug!(
162 "Bailing out from try_frontend_peek, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is for EXPLAIN BROKEN"
163 );
164 return Ok(None);
165 }
166 }
167 }
168 Statement::Copy(copy_stmt) => {
169 match ©_stmt.direction {
170 CopyDirection::To => {
171 }
173 CopyDirection::From => {
174 debug!(
175 "Bailing out from try_frontend_peek, because COPY FROM is not supported"
176 );
177 return Ok(None);
178 }
179 }
180 }
181
182 Statement::Subscribe(_)
183 if ENABLE_FRONTEND_SUBSCRIBES.get(catalog.system_config().dyncfgs()) =>
184 {
185 }
187 _ => {
188 debug!(
189 "Bailing out from try_frontend_peek, because statement type is not supported"
190 );
191 return Ok(None);
192 }
193 }
194 }
195
196 let statement_logging_id = if outer_ctx_extra.is_none() {
199 let result = self.statement_logging_frontend.begin_statement_execution(
201 session,
202 ¶ms,
203 &logging,
204 catalog.system_config(),
205 lifecycle_timestamps,
206 );
207
208 if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result {
209 self.log_began_execution(began_execution, mseh_update, prepared_statement);
210 Some(logging_id)
211 } else {
212 None
213 }
214 } else {
215 outer_ctx_extra
222 .take()
223 .and_then(|guard| guard.defuse().retire())
224 };
225
226 let result = self
227 .try_frontend_peek_inner(session, catalog, stmt, params, statement_logging_id)
228 .await;
229
230 if let Some(logging_id) = statement_logging_id {
233 let reason = match &result {
234 Ok(Some(
236 ExecuteResponse::SendingRowsStreaming { .. }
237 | ExecuteResponse::Subscribing { .. },
238 )) => {
239 return result;
242 }
243 Ok(Some(resp @ ExecuteResponse::CopyTo { resp: inner, .. })) => {
245 match inner.as_ref() {
246 ExecuteResponse::SendingRowsStreaming { .. }
247 | ExecuteResponse::Subscribing { .. } => {
248 return result;
251 }
252 _ => resp.into(),
254 }
255 }
256 Ok(None) => {
258 soft_panic_or_log!(
259 "Bailed out from `try_frontend_peek_inner` after we already logged the beginning of statement execution."
260 );
261 self.log_ended_execution(
264 logging_id,
265 StatementEndedExecutionReason::Errored {
266 error: "Internal error: bailed out from `try_frontend_peek_inner`"
267 .to_string(),
268 },
269 );
270 return result;
271 }
272 Ok(Some(resp)) => resp.into(),
277 Err(e) => StatementEndedExecutionReason::Errored {
278 error: e.to_string(),
279 },
280 };
281
282 self.log_ended_execution(logging_id, reason);
283 }
284
285 result
286 }
287
288 async fn try_frontend_peek_inner(
291 &mut self,
292 session: &mut Session,
293 catalog: Arc<Catalog>,
294 stmt: Option<Arc<Statement<Raw>>>,
295 params: Params,
296 statement_logging_id: Option<crate::statement_logging::StatementLoggingId>,
297 ) -> Result<Option<ExecuteResponse>, AdapterError> {
298 let stmt = match stmt {
299 Some(stmt) => stmt,
300 None => {
301 debug!("try_frontend_peek_inner succeeded on an empty query");
302 return Ok(Some(ExecuteResponse::EmptyQuery));
303 }
304 };
305
306 session
307 .metrics()
308 .query_total(&[
309 metrics::session_type_label_value(session.user()),
310 metrics::statement_type_label_value(&stmt),
311 ])
312 .inc();
313
314 let conn_catalog = catalog.for_session(session);
317 let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?;
320
321 let pcx = session.pcx();
322 let plan = mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, ¶ms, &resolved_ids)?;
323
324 enum QueryPlan<'a> {
326 Select(&'a SelectPlan),
327 CopyTo(&'a SelectPlan, CopyToContext),
328 Subscribe(&'a SubscribePlan),
329 }
330
331 let (query_plan, explain_ctx) = match &plan {
332 Plan::Select(select_plan) => {
333 let explain_ctx = if session.vars().emit_plan_insights_notice() {
334 let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
335 ExplainContext::PlanInsightsNotice(optimizer_trace)
336 } else {
337 ExplainContext::None
338 };
339 (QueryPlan::Select(select_plan), explain_ctx)
340 }
341 Plan::ShowColumns(show_columns_plan) => {
342 (
344 QueryPlan::Select(&show_columns_plan.select_plan),
345 ExplainContext::None,
346 )
347 }
348 Plan::ExplainPlan(plan::ExplainPlanPlan {
349 stage,
350 format,
351 config,
352 explainee: Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc }),
353 }) => {
354 let optimizer_trace = OptimizerTrace::new(stage.paths());
356 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
357 broken: *broken,
358 config: config.clone(),
359 format: *format,
360 stage: *stage,
361 replan: None,
362 desc: Some(desc.clone()),
363 optimizer_trace,
364 });
365 (QueryPlan::Select(plan), explain_ctx)
366 }
367 Plan::CopyTo(plan::CopyToPlan {
369 select_plan,
370 desc,
371 to,
372 connection,
373 connection_id,
374 format,
375 max_file_size,
376 }) => {
377 let uri = eval_copy_to_uri(to.clone(), session, catalog.state())?;
378
379 let copy_to_ctx = CopyToContext {
381 desc: desc.clone(),
382 uri,
383 connection: connection.clone(),
384 connection_id: *connection_id,
385 format: format.clone(),
386 max_file_size: *max_file_size,
387 output_batch_count: None,
388 };
389
390 (
391 QueryPlan::CopyTo(select_plan, copy_to_ctx),
392 ExplainContext::None,
393 )
394 }
395 Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => {
396 match explainee {
398 plan::Explainee::Statement(plan::ExplaineeStatement::Select {
399 broken: false,
400 plan,
401 desc: _,
402 }) => {
403 let explain_ctx = ExplainContext::Pushdown;
404 (QueryPlan::Select(plan), explain_ctx)
405 }
406 _ => {
407 soft_panic_or_log!(
410 "unexpected EXPLAIN FILTER PUSHDOWN plan kind in frontend peek sequencing: {:?}",
411 explainee
412 );
413 debug!(
414 "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is EXPLAIN BROKEN"
415 );
416 return Ok(None);
417 }
418 }
419 }
420 Plan::SideEffectingFunc(sef_plan) => {
421 let response = self
425 .call_coordinator(|tx| Command::ExecuteSideEffectingFunc {
426 plan: sef_plan.clone(),
427 conn_id: session.conn_id().clone(),
428 current_role: session.role_metadata().current_role,
429 tx,
430 })
431 .await?;
432 return Ok(Some(response));
433 }
434 Plan::Subscribe(subscribe) => (QueryPlan::Subscribe(subscribe), ExplainContext::None),
435 _ => {
436 soft_panic_or_log!(
439 "Unexpected plan kind in frontend peek sequencing: {:?}",
440 plan
441 );
442 debug!(
443 "Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, side-effecting SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO S3"
444 );
445 return Ok(None);
446 }
447 };
448
449 let when = match query_plan {
450 QueryPlan::Select(s) => &s.when,
451 QueryPlan::CopyTo(s, _) => &s.when,
452 QueryPlan::Subscribe(s) => &s.when,
453 };
454
455 let depends_on = match query_plan {
456 QueryPlan::Select(s) => s.source.depends_on(),
457 QueryPlan::CopyTo(s, _) => s.source.depends_on(),
458 QueryPlan::Subscribe(s) => s.from.depends_on(),
459 };
460
461 let contains_temporal = match query_plan {
462 QueryPlan::Select(s) => s.source.contains_temporal(),
463 QueryPlan::CopyTo(s, _) => s.source.contains_temporal(),
464 QueryPlan::Subscribe(s) => Ok(s.from.contains_temporal()),
465 };
466
467 assert!(plan.allowed_in_read_only());
471
472 let (cluster, target_cluster_id, target_cluster_name) = {
473 let target_cluster = match session.transaction().cluster() {
474 Some(cluster_id) => TargetCluster::Transaction(cluster_id),
476 None => coord::catalog_serving::auto_run_on_catalog_server(
478 &conn_catalog,
479 session,
480 &plan,
481 ),
482 };
483 let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
484 (cluster, cluster.id, &cluster.name)
485 };
486
487 if let Some(logging_id) = &statement_logging_id {
489 self.log_set_cluster(*logging_id, target_cluster_id);
490 }
491
492 coord::catalog_serving::check_cluster_restrictions(
493 target_cluster_name.as_str(),
494 &conn_catalog,
495 &plan,
496 )?;
497
498 rbac::check_plan(
499 &conn_catalog,
500 None::<fn(u32) -> Option<RoleId>>,
503 session,
504 &plan,
505 Some(target_cluster_id),
506 &resolved_ids,
507 )?;
508
509 if let Some((_, wait_future)) =
510 coord::appends::waiting_on_startup_appends(&*catalog, session, &plan)
511 {
512 wait_future.await;
513 }
514
515 let max_query_result_size = Some(session.vars().max_query_result_size());
516
517 let compute_instance_snapshot =
522 ComputeInstanceSnapshot::new_without_collections(cluster.id());
523
524 let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config())
525 .override_from(&catalog.get_cluster(cluster.id()).config.features())
526 .override_from(&explain_ctx);
527
528 if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
529 return Err(AdapterError::NoClusterReplicasAvailable {
530 name: cluster.name.clone(),
531 is_managed: cluster.is_managed(),
532 });
533 }
534
535 let (_, view_id) = self.transient_id_gen.allocate_id();
536 let (_, index_id) = self.transient_id_gen.allocate_id();
537
538 let target_replica_name = session.vars().cluster_replica();
539 let mut target_replica = target_replica_name
540 .map(|name| {
541 cluster
542 .replica_id(name)
543 .ok_or(AdapterError::UnknownClusterReplica {
544 cluster_name: cluster.name.clone(),
545 replica_name: name.to_string(),
546 })
547 })
548 .transpose()?;
549
550 let source_ids = depends_on;
551 let mut timeline_context = catalog.validate_timeline_context(source_ids.iter().copied())?;
555 if matches!(timeline_context, TimelineContext::TimestampIndependent) && contains_temporal? {
556 timeline_context = TimelineContext::TimestampDependent;
560 }
561
562 let notices = coord::sequencer::check_log_reads(
563 &catalog,
564 cluster,
565 &source_ids,
566 &mut target_replica,
567 session.vars(),
568 )?;
569 session.add_notices(notices);
570
571 let isolation_level = session.vars().transaction_isolation().clone();
574 let timeline = Coordinator::get_timeline(&timeline_context);
575 let needs_linearized_read_ts =
576 Coordinator::needs_linearized_read_ts(&isolation_level, when);
577
578 let oracle_read_ts = match timeline {
579 Some(timeline) if needs_linearized_read_ts => {
580 let oracle = self.ensure_oracle(timeline).await?;
581 let oracle_read_ts = oracle.read_ts().await;
582 Some(oracle_read_ts)
583 }
584 Some(_) | None => None,
585 };
586
587 let vars = session.vars();
590 let real_time_recency_ts: Option<Timestamp> = if vars.real_time_recency()
591 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
592 && !session.contains_read_timestamp()
593 {
594 self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp {
596 source_ids: source_ids.clone(),
597 real_time_recency_timeout: *vars.real_time_recency_timeout(),
598 tx,
599 })
600 .await?
601 } else {
602 None
603 };
604
605 let dataflow_builder =
608 DataflowBuilder::new(catalog.state(), compute_instance_snapshot.clone());
609 let input_id_bundle = dataflow_builder.sufficient_collections(source_ids.clone());
610
611 let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when)
624 && !matches!(query_plan, QueryPlan::Subscribe { .. });
625
626 let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
628 Some(
631 determination @ TimestampDetermination {
632 timestamp_context: TimestampContext::TimelineTimestamp { .. },
633 ..
634 },
635 ) if in_immediate_multi_stmt_txn => {
636 let txn_read_holds_opt = self
644 .call_coordinator(|tx| Command::GetTransactionReadHoldsBundle {
645 conn_id: session.conn_id().clone(),
646 tx,
647 })
648 .await;
649
650 if let Some(txn_read_holds) = txn_read_holds_opt {
651 let allowed_id_bundle = txn_read_holds.id_bundle();
652 let outside = input_id_bundle.difference(&allowed_id_bundle);
653
654 if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
656 let valid_names =
657 allowed_id_bundle.resolve_names(&*catalog, session.conn_id());
658 let invalid_names = outside.resolve_names(&*catalog, session.conn_id());
659 return Err(AdapterError::RelationOutsideTimeDomain {
660 relations: invalid_names,
661 names: valid_names,
662 });
663 }
664
665 let read_holds = txn_read_holds.subset(&input_id_bundle);
667
668 (determination, read_holds)
669 } else {
670 return Err(AdapterError::Internal(
675 "Missing transaction read holds for multi-statement transaction"
676 .to_string(),
677 ));
678 }
679 }
680 _ => {
681 let timedomain_bundle;
688 let determine_bundle = if in_immediate_multi_stmt_txn {
689 timedomain_bundle = timedomain_for(
693 &*catalog,
694 &dataflow_builder,
695 &source_ids,
696 &timeline_context,
697 session.conn_id(),
698 target_cluster_id,
699 )?;
700 &timedomain_bundle
701 } else {
702 &input_id_bundle
704 };
705 let (determination, read_holds) = self
706 .frontend_determine_timestamp(
707 session,
708 determine_bundle,
709 when,
710 target_cluster_id,
711 &timeline_context,
712 oracle_read_ts,
713 real_time_recency_ts,
714 )
715 .await?;
716
717 if in_immediate_multi_stmt_txn {
721 self.call_coordinator(|tx| Command::StoreTransactionReadHolds {
722 conn_id: session.conn_id().clone(),
723 read_holds: read_holds.clone(),
724 tx,
725 })
726 .await;
727 }
728
729 (determination, read_holds)
730 }
731 };
732
733 {
734 for id in input_id_bundle.iter() {
736 let s = read_holds.storage_holds.contains_key(&id);
737 let c = read_holds
738 .compute_ids()
739 .map(|(_instance, coll)| coll)
740 .contains(&id);
741 soft_assert_or_log!(
742 s || c,
743 "missing read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
744 id,
745 in_immediate_multi_stmt_txn,
746 );
747 }
748
749 for id in input_id_bundle.storage_ids.iter() {
752 soft_assert_or_log!(
753 read_holds.storage_holds.contains_key(id),
754 "missing storage read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
755 id,
756 in_immediate_multi_stmt_txn,
757 );
758 }
759 for id in input_id_bundle
760 .compute_ids
761 .iter()
762 .flat_map(|(_instance, colls)| colls)
763 {
764 soft_assert_or_log!(
765 read_holds
766 .compute_ids()
767 .map(|(_instance, coll)| coll)
768 .contains(id),
769 "missing compute read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
770 id,
771 in_immediate_multi_stmt_txn,
772 );
773 }
774 }
775
776 let requires_linearization = (&explain_ctx).into();
789 let mut transaction_determination = determination.clone();
790 match query_plan {
791 QueryPlan::Subscribe { .. } => {
792 if when.is_transactional() {
793 session.add_transaction_ops(TransactionOps::Subscribe)?;
794 }
795 }
796 QueryPlan::Select(..) | QueryPlan::CopyTo(..) => {
797 if when.is_transactional() {
798 session.add_transaction_ops(TransactionOps::Peeks {
799 determination: transaction_determination,
800 cluster_id: target_cluster_id,
801 requires_linearization,
802 })?;
803 } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
804 transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
806 session.add_transaction_ops(TransactionOps::Peeks {
807 determination: transaction_determination,
808 cluster_id: target_cluster_id,
809 requires_linearization,
810 })?;
811 }
812 }
813 }
814
815 let stats = statistics_oracle(
818 session,
819 &source_ids,
820 &determination.timestamp_context.antichain(),
821 true,
822 catalog.system_config(),
823 &*self.storage_collections,
824 )
825 .await
826 .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
827
828 let timestamp_context = determination.timestamp_context.clone();
831 let session_meta = session.meta();
832 let now = catalog.config().now.clone();
833 let target_cluster_name = target_cluster_name.clone();
834 let needs_plan_insights = explain_ctx.needs_plan_insights();
835 let determination_for_pushdown = if matches!(explain_ctx, ExplainContext::Pushdown) {
836 Some(determination.clone())
839 } else {
840 None
841 };
842
843 let span = Span::current();
844
845 let catalog_for_insights = if needs_plan_insights {
847 Some(Arc::clone(&catalog))
848 } else {
849 None
850 };
851 let mut compute_instances = BTreeMap::new();
852 if needs_plan_insights {
853 for user_cluster in catalog.user_clusters() {
854 let snapshot = ComputeInstanceSnapshot::new_without_collections(user_cluster.id);
855 compute_instances.insert(user_cluster.name.clone(), snapshot);
856 }
857 }
858
859 let source_ids_for_closure = source_ids.clone();
860
861 let optimization_future: JoinHandle<Result<_, AdapterError>> = match query_plan {
862 QueryPlan::CopyTo(select_plan, mut copy_to_ctx) => {
863 let raw_expr = select_plan.source.clone();
864
865 let worker_counts = cluster.replicas().map(|r| {
867 let loc = &r.config.location;
868 loc.workers().unwrap_or_else(|| loc.num_processes())
869 });
870 let max_worker_count = match worker_counts.max() {
871 Some(count) => u64::cast_from(count),
872 None => {
873 return Err(AdapterError::NoClusterReplicasAvailable {
874 name: cluster.name.clone(),
875 is_managed: cluster.is_managed(),
876 });
877 }
878 };
879 copy_to_ctx.output_batch_count = Some(max_worker_count);
880
881 let mut optimizer = optimize::copy_to::Optimizer::new(
882 Arc::clone(&catalog),
883 compute_instance_snapshot,
884 view_id,
885 copy_to_ctx,
886 optimizer_config,
887 self.optimizer_metrics.clone(),
888 );
889
890 mz_ore::task::spawn_blocking(
891 || "optimize copy-to",
892 move || {
893 span.in_scope(|| {
894 let _dispatch_guard = explain_ctx.dispatch_guard();
895
896 let local_mir_plan =
899 optimizer.catch_unwind_optimize(raw_expr.clone())?;
900 let local_mir_plan = local_mir_plan.resolve(
902 timestamp_context.clone(),
903 &session_meta,
904 stats,
905 );
906 let global_lir_plan =
908 optimizer.catch_unwind_optimize(local_mir_plan)?;
909 Ok(Execution::CopyToS3 {
910 global_lir_plan,
911 source_ids: source_ids_for_closure,
912 })
913 })
914 },
915 )
916 }
917 QueryPlan::Select(select_plan) => {
918 let select_plan = select_plan.clone();
919 let raw_expr = select_plan.source.clone();
920
921 let mut optimizer = optimize::peek::Optimizer::new(
923 Arc::clone(&catalog),
924 compute_instance_snapshot,
925 select_plan.finishing.clone(),
926 view_id,
927 index_id,
928 optimizer_config,
929 self.optimizer_metrics.clone(),
930 );
931
932 mz_ore::task::spawn_blocking(
933 || "optimize peek",
934 move || {
935 span.in_scope(|| {
936 let _dispatch_guard = explain_ctx.dispatch_guard();
937
938 let pipeline = || {
945 let local_mir_plan =
946 optimizer.catch_unwind_optimize(raw_expr.clone())?;
947 let local_mir_plan = local_mir_plan.resolve(
949 timestamp_context.clone(),
950 &session_meta,
951 stats,
952 );
953 let global_lir_plan =
955 optimizer.catch_unwind_optimize(local_mir_plan)?;
956 Ok::<_, AdapterError>(global_lir_plan)
957 };
958
959 let global_lir_plan_result = pipeline();
960 let optimization_finished_at = now();
961
962 let create_insights_ctx =
963 |optimizer: &optimize::peek::Optimizer,
964 is_notice: bool|
965 -> Option<Box<PlanInsightsContext>> {
966 if !needs_plan_insights {
967 return None;
968 }
969
970 let catalog = catalog_for_insights.as_ref()?;
971
972 let enable_re_optimize = if needs_plan_insights {
973 let dyncfgs = catalog.system_config().dyncfgs();
981 let opt_limit = mz_adapter_types::dyncfgs
982 ::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
983 .get(dyncfgs);
984 !(is_notice && optimizer.duration() > opt_limit)
985 } else {
986 false
987 };
988
989 Some(Box::new(PlanInsightsContext {
990 stmt: select_plan
991 .select
992 .as_deref()
993 .map(Clone::clone)
994 .map(Statement::Select),
995 raw_expr: raw_expr.clone(),
996 catalog: Arc::clone(catalog),
997 compute_instances,
998 target_instance: target_cluster_name,
999 metrics: optimizer.metrics().clone(),
1000 finishing: optimizer.finishing().clone(),
1001 optimizer_config: optimizer.config().clone(),
1002 session: session_meta,
1003 timestamp_context,
1004 view_id: optimizer.select_id(),
1005 index_id: optimizer.index_id(),
1006 enable_re_optimize,
1007 }))
1008 };
1009
1010 let global_lir_plan = match global_lir_plan_result {
1011 Ok(plan) => plan,
1012 Err(err) => {
1013 let result = if let ExplainContext::Plan(explain_ctx) =
1014 explain_ctx
1015 && explain_ctx.broken
1016 {
1017 tracing::error!(
1019 "error while handling EXPLAIN statement: {}",
1020 err
1021 );
1022 Ok(Execution::ExplainPlan {
1023 df_meta: Default::default(),
1024 explain_ctx,
1025 optimizer,
1026 insights_ctx: None,
1027 })
1028 } else {
1029 Err(err)
1030 };
1031 return result;
1032 }
1033 };
1034
1035 match explain_ctx {
1036 ExplainContext::Plan(explain_ctx) => {
1037 let (_, df_meta, _) = global_lir_plan.unapply();
1038 let insights_ctx = create_insights_ctx(&optimizer, false);
1039 Ok(Execution::ExplainPlan {
1040 df_meta,
1041 explain_ctx,
1042 optimizer,
1043 insights_ctx,
1044 })
1045 }
1046 ExplainContext::None => Ok(Execution::Peek {
1047 global_lir_plan,
1048 optimization_finished_at,
1049 plan_insights_optimizer_trace: None,
1050 finishing: select_plan.finishing,
1051 copy_to: select_plan.copy_to,
1052 insights_ctx: None,
1053 }),
1054 ExplainContext::PlanInsightsNotice(optimizer_trace) => {
1055 let insights_ctx = create_insights_ctx(&optimizer, true);
1056 Ok(Execution::Peek {
1057 global_lir_plan,
1058 optimization_finished_at,
1059 plan_insights_optimizer_trace: Some(optimizer_trace),
1060 finishing: select_plan.finishing,
1061 copy_to: select_plan.copy_to,
1062 insights_ctx,
1063 })
1064 }
1065 ExplainContext::Pushdown => {
1066 let (plan, _, _) = global_lir_plan.unapply();
1067 let imports = match plan {
1068 PeekPlan::SlowPath(plan) => plan
1069 .desc
1070 .source_imports
1071 .into_iter()
1072 .filter_map(|(id, import)| {
1073 import.desc.arguments.operators.map(|mfp| (id, mfp))
1074 })
1075 .collect(),
1076 PeekPlan::FastPath(_) => {
1077 std::collections::BTreeMap::default()
1078 }
1079 };
1080 Ok(Execution::ExplainPushdown {
1081 imports,
1082 determination: determination_for_pushdown
1083 .expect("it's present for the ExplainPushdown case"),
1084 })
1085 }
1086 }
1087 })
1088 },
1089 )
1090 }
1091 QueryPlan::Subscribe(plan) => {
1092 let plan = plan.clone();
1093 let catalog: Arc<Catalog> = Arc::clone(&catalog);
1094 let debug_name = format!("subscribe-{}", index_id);
1095 let mut optimizer = optimize::subscribe::Optimizer::new(
1096 catalog,
1097 compute_instance_snapshot.clone(),
1098 view_id,
1099 index_id,
1100 plan.with_snapshot,
1101 plan.up_to,
1102 debug_name,
1103 optimizer_config,
1104 self.optimizer_metrics.clone(),
1105 );
1106 mz_ore::task::spawn_blocking(
1107 || "optimize subscribe",
1108 move || {
1109 span.in_scope(|| {
1110 let _dispatch_guard = explain_ctx.dispatch_guard();
1111
1112 let global_mir_plan = optimizer.catch_unwind_optimize(plan.clone())?;
1113 let as_of = timestamp_context.timestamp_or_default();
1114
1115 if let Some(up_to) = optimizer.up_to() {
1116 if as_of > up_to {
1117 return Err(AdapterError::AbsurdSubscribeBounds {
1118 as_of,
1119 up_to,
1120 });
1121 }
1122 }
1123 let local_mir_plan =
1124 global_mir_plan.resolve(Antichain::from_elem(as_of));
1125
1126 let global_lir_plan =
1127 optimizer.catch_unwind_optimize(local_mir_plan)?;
1128 let optimization_finished_at = now();
1129
1130 let (df_desc, df_meta) = global_lir_plan.unapply();
1131 Ok(Execution::Subscribe {
1132 subscribe_plan: plan,
1133 df_desc,
1134 df_meta,
1135 optimization_finished_at,
1136 })
1137 })
1138 },
1139 )
1140 }
1141 };
1142
1143 let optimization_timeout = *session.vars().statement_timeout();
1144 let optimization_result =
1145 match tokio::time::timeout(optimization_timeout, optimization_future).await {
1150 Ok(Ok(result)) => result,
1151 Ok(Err(AdapterError::Optimizer(err))) => {
1152 return Err(AdapterError::Internal(format!(
1153 "internal error in optimizer: {}",
1154 err
1155 )));
1156 }
1157 Ok(Err(err)) => {
1158 return Err(err);
1159 }
1160 Err(_elapsed) => {
1161 warn!("optimize peek timed out after {:?}", optimization_timeout);
1162 return Err(AdapterError::StatementTimeout);
1163 }
1164 };
1165
1166 if let Some(logging_id) = &statement_logging_id {
1168 self.log_lifecycle_event(*logging_id, StatementLifecycleEvent::OptimizationFinished);
1169 }
1170
1171 Self::assert_read_holds_correct(
1173 &read_holds,
1174 &optimization_result,
1175 &determination,
1176 target_cluster_id,
1177 in_immediate_multi_stmt_txn,
1178 );
1179
1180 match optimization_result {
1182 Execution::ExplainPlan {
1183 df_meta,
1184 explain_ctx,
1185 optimizer,
1186 insights_ctx,
1187 } => {
1188 let rows = coord::sequencer::explain_plan_inner(
1189 session,
1190 &catalog,
1191 df_meta,
1192 explain_ctx,
1193 optimizer,
1194 insights_ctx,
1195 )
1196 .await?;
1197
1198 Ok(Some(ExecuteResponse::SendingRowsImmediate {
1199 rows: Box::new(rows.into_row_iter()),
1200 }))
1201 }
1202 Execution::ExplainPushdown {
1203 imports,
1204 determination,
1205 } => {
1206 let as_of = determination.timestamp_context.antichain();
1209 let mz_now = determination
1210 .timestamp_context
1211 .timestamp()
1212 .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1213 .unwrap_or_else(ResultSpec::value_all);
1214
1215 Ok(Some(
1216 coord::sequencer::explain_pushdown_future_inner(
1217 session,
1218 &*catalog,
1219 &self.storage_collections,
1220 as_of,
1221 mz_now,
1222 imports,
1223 )
1224 .await
1225 .await?,
1226 ))
1227 }
1228 Execution::Peek {
1229 global_lir_plan,
1230 optimization_finished_at: _optimization_finished_at,
1231 plan_insights_optimizer_trace,
1232 finishing,
1233 copy_to,
1234 insights_ctx,
1235 } => {
1236 let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
1241
1242 coord::sequencer::emit_optimizer_notices(
1243 &*catalog,
1244 session,
1245 &df_meta.optimizer_notices,
1246 );
1247
1248 if let Some(trace) = plan_insights_optimizer_trace {
1250 let target_cluster = catalog.get_cluster(target_cluster_id);
1251 let features = OptimizerFeatures::from(catalog.system_config())
1252 .override_from(&target_cluster.config.features());
1253 let insights = trace
1254 .into_plan_insights(
1255 &features,
1256 &catalog.for_session(session),
1257 Some(finishing.clone()),
1258 Some(target_cluster),
1259 df_meta.clone(),
1260 insights_ctx,
1261 )
1262 .await?;
1263 session.add_notice(AdapterNotice::PlanInsights(insights));
1264 }
1265
1266 let watch_set = statement_logging_id.map(|logging_id| {
1269 WatchSetCreation::new(
1270 logging_id,
1271 catalog.state(),
1272 &input_id_bundle,
1273 determination.timestamp_context.timestamp_or_default(),
1274 )
1275 });
1276
1277 let max_result_size = catalog.system_config().max_result_size();
1278
1279 let determination_for_notice = if session.vars().emit_timestamp_notice() {
1282 Some(determination.clone())
1283 } else {
1284 None
1285 };
1286
1287 let response = match peek_plan {
1288 PeekPlan::FastPath(fast_path_plan) => {
1289 if let Some(logging_id) = &statement_logging_id {
1290 if !matches!(fast_path_plan, FastPathPlan::Constant(..)) {
1304 self.log_set_timestamp(
1305 *logging_id,
1306 determination.timestamp_context.timestamp_or_default(),
1307 );
1308 }
1309 }
1310
1311 let row_set_finishing_seconds =
1312 session.metrics().row_set_finishing_seconds().clone();
1313
1314 let peek_stash_read_batch_size_bytes =
1315 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
1316 .get(catalog.system_config().dyncfgs());
1317 let peek_stash_read_memory_budget_bytes =
1318 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
1319 .get(catalog.system_config().dyncfgs());
1320
1321 self.implement_fast_path_peek_plan(
1322 fast_path_plan,
1323 determination.timestamp_context.timestamp_or_default(),
1324 finishing,
1325 target_cluster_id,
1326 target_replica,
1327 typ,
1328 max_result_size,
1329 max_query_result_size,
1330 row_set_finishing_seconds,
1331 read_holds,
1332 peek_stash_read_batch_size_bytes,
1333 peek_stash_read_memory_budget_bytes,
1334 session.conn_id().clone(),
1335 source_ids,
1336 watch_set,
1337 )
1338 .await?
1339 }
1340 PeekPlan::SlowPath(dataflow_plan) => {
1341 if let Some(logging_id) = &statement_logging_id {
1342 self.log_set_transient_index_id(*logging_id, dataflow_plan.id);
1343 }
1344
1345 self.call_coordinator(|tx| Command::ExecuteSlowPathPeek {
1346 dataflow_plan: Box::new(dataflow_plan),
1347 determination,
1348 finishing,
1349 compute_instance: target_cluster_id,
1350 target_replica,
1351 intermediate_result_type: typ,
1352 source_ids,
1353 conn_id: session.conn_id().clone(),
1354 max_result_size,
1355 max_query_result_size,
1356 watch_set,
1357 tx,
1358 })
1359 .await?
1360 }
1361 };
1362
1363 if let Some(determination) = determination_for_notice {
1365 let explanation = self
1366 .call_coordinator(|tx| Command::ExplainTimestamp {
1367 conn_id: session.conn_id().clone(),
1368 session_wall_time: session.pcx().wall_time,
1369 cluster_id: target_cluster_id,
1370 id_bundle: input_id_bundle.clone(),
1371 determination,
1372 tx,
1373 })
1374 .await;
1375 session.add_notice(AdapterNotice::QueryTimestamp { explanation });
1376 }
1377
1378 Ok(Some(match copy_to {
1379 None => response,
1380 Some(format) => ExecuteResponse::CopyTo {
1382 format,
1383 resp: Box::new(response),
1384 },
1385 }))
1386 }
1387 Execution::Subscribe {
1388 subscribe_plan,
1389 df_desc,
1390 df_meta,
1391 optimization_finished_at: _optimization_finished_at,
1392 } => {
1393 if df_desc.as_of.as_ref().expect("as of set") == &df_desc.until {
1394 session.add_notice(AdapterNotice::EqualSubscribeBounds {
1395 bound: *df_desc.until.as_option().expect("as of set"),
1396 });
1397 }
1398 coord::sequencer::emit_optimizer_notices(
1399 &*catalog,
1400 session,
1401 &df_meta.optimizer_notices,
1402 );
1403
1404 let response = self
1405 .call_coordinator(|tx| Command::ExecuteSubscribe {
1406 df_desc,
1407 dependency_ids: subscribe_plan.from.depends_on(),
1408 cluster_id: target_cluster_id,
1409 replica_id: target_replica,
1410 conn_id: session.conn_id().clone(),
1411 session_uuid: session.uuid(),
1412 read_holds,
1413 plan: subscribe_plan,
1414 statement_logging_id,
1415 tx,
1416 })
1417 .await?;
1418 Ok(Some(response))
1419 }
1420 Execution::CopyToS3 {
1421 global_lir_plan,
1422 source_ids,
1423 } => {
1424 let (df_desc, df_meta) = global_lir_plan.unapply();
1425
1426 coord::sequencer::emit_optimizer_notices(
1427 &*catalog,
1428 session,
1429 &df_meta.optimizer_notices,
1430 );
1431
1432 let sink_id = df_desc.sink_id();
1434 let sinks = &df_desc.sink_exports;
1435 if sinks.len() != 1 {
1436 return Err(AdapterError::Internal(
1437 "expected exactly one copy to s3 sink".into(),
1438 ));
1439 }
1440 let (_, sink_desc) = sinks
1441 .first_key_value()
1442 .expect("known to be exactly one copy to s3 sink");
1443 let s3_sink_connection = match &sink_desc.connection {
1444 mz_compute_types::sinks::ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1445 conn.clone()
1446 }
1447 _ => {
1448 return Err(AdapterError::Internal(
1449 "expected copy to s3 oneshot sink".into(),
1450 ));
1451 }
1452 };
1453
1454 self.call_coordinator(|tx| Command::CopyToPreflight {
1457 s3_sink_connection,
1458 sink_id,
1459 tx,
1460 })
1461 .await?;
1462
1463 let watch_set = statement_logging_id.map(|logging_id| {
1465 WatchSetCreation::new(
1466 logging_id,
1467 catalog.state(),
1468 &input_id_bundle,
1469 determination.timestamp_context.timestamp_or_default(),
1470 )
1471 });
1472
1473 let response = self
1474 .call_coordinator(|tx| Command::ExecuteCopyTo {
1475 df_desc: Box::new(df_desc),
1476 compute_instance: target_cluster_id,
1477 target_replica,
1478 source_ids,
1479 conn_id: session.conn_id().clone(),
1480 watch_set,
1481 tx,
1482 })
1483 .await?;
1484
1485 Ok(Some(response))
1486 }
1487 }
1488 }
1489
1490 pub(crate) async fn frontend_determine_timestamp(
1497 &mut self,
1498 session: &Session,
1499 id_bundle: &CollectionIdBundle,
1500 when: &QueryWhen,
1501 compute_instance: ComputeInstanceId,
1502 timeline_context: &TimelineContext,
1503 oracle_read_ts: Option<Timestamp>,
1504 real_time_recency_ts: Option<Timestamp>,
1505 ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
1506 let isolation_level = session.vars().transaction_isolation();
1509
1510 let (read_holds, upper) = self
1511 .acquire_read_holds_and_least_valid_write(id_bundle)
1512 .await
1513 .map_err(|err| {
1514 AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1515 err,
1516 compute_instance,
1517 )
1518 })?;
1519 let (det, read_holds) = <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1520 session,
1521 id_bundle,
1522 when,
1523 timeline_context,
1524 oracle_read_ts,
1525 real_time_recency_ts,
1526 isolation_level,
1527 read_holds,
1528 upper.clone(),
1529 )?;
1530
1531 session
1532 .metrics()
1533 .determine_timestamp(&[
1534 match det.respond_immediately() {
1535 true => "true",
1536 false => "false",
1537 },
1538 isolation_level.as_str(),
1539 &compute_instance.to_string(),
1540 ])
1541 .inc();
1542 if !det.respond_immediately()
1543 && isolation_level == &IsolationLevel::StrictSerializable
1544 && real_time_recency_ts.is_none()
1545 {
1546 if let Some(strict) = det.timestamp_context.timestamp() {
1548 let (serializable_det, _tmp_read_holds) =
1549 <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1550 session,
1551 id_bundle,
1552 when,
1553 timeline_context,
1554 oracle_read_ts,
1555 real_time_recency_ts,
1556 &IsolationLevel::Serializable,
1557 read_holds.clone(),
1558 upper,
1559 )?;
1560 if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
1561 session
1562 .metrics()
1563 .timestamp_difference_for_strict_serializable_ms(&[compute_instance
1564 .to_string()
1565 .as_ref()])
1566 .observe(f64::cast_lossy(u64::from(
1567 strict.saturating_sub(*serializable),
1568 )));
1569 }
1570 }
1571 }
1572
1573 Ok((det, read_holds))
1574 }
1575
1576 fn assert_read_holds_correct(
1577 read_holds: &ReadHolds<Timestamp>,
1578 execution: &Execution,
1579 determination: &TimestampDetermination<Timestamp>,
1580 target_cluster_id: ClusterId,
1581 in_immediate_multi_stmt_txn: bool,
1582 ) {
1583 let (source_imports, index_imports, as_of, execution_name): (
1585 Vec<GlobalId>,
1586 Vec<GlobalId>,
1587 Timestamp,
1588 &str,
1589 ) = match execution {
1590 Execution::Peek {
1591 global_lir_plan, ..
1592 } => match global_lir_plan.peek_plan() {
1593 PeekPlan::FastPath(fast_path_plan) => {
1594 let (sources, indexes) = match fast_path_plan {
1595 FastPathPlan::Constant(..) => (vec![], vec![]),
1596 FastPathPlan::PeekExisting(_coll_id, idx_id, ..) => (vec![], vec![*idx_id]),
1597 FastPathPlan::PeekPersist(global_id, ..) => (vec![*global_id], vec![]),
1598 };
1599 (
1600 sources,
1601 indexes,
1602 determination.timestamp_context.timestamp_or_default(),
1603 "FastPath",
1604 )
1605 }
1606 PeekPlan::SlowPath(dataflow_plan) => {
1607 let as_of = dataflow_plan
1608 .desc
1609 .as_of
1610 .clone()
1611 .expect("dataflow has an as_of")
1612 .into_element();
1613 (
1614 dataflow_plan.desc.source_imports.keys().cloned().collect(),
1615 dataflow_plan.desc.index_imports.keys().cloned().collect(),
1616 as_of,
1617 "SlowPath",
1618 )
1619 }
1620 },
1621 Execution::CopyToS3 {
1622 global_lir_plan, ..
1623 } => {
1624 let df_desc = global_lir_plan.df_desc();
1625 let as_of = df_desc
1626 .as_of
1627 .clone()
1628 .expect("dataflow has an as_of")
1629 .into_element();
1630 (
1631 df_desc.source_imports.keys().cloned().collect(),
1632 df_desc.index_imports.keys().cloned().collect(),
1633 as_of,
1634 "CopyToS3",
1635 )
1636 }
1637 Execution::ExplainPlan { .. } | Execution::ExplainPushdown { .. } => {
1638 return;
1640 }
1641 Execution::Subscribe { df_desc, .. } => {
1642 let as_of = df_desc
1643 .as_of
1644 .clone()
1645 .expect("dataflow has an as_of")
1646 .into_element();
1647 (
1648 df_desc.source_imports.keys().cloned().collect(),
1649 df_desc.index_imports.keys().cloned().collect(),
1650 as_of,
1651 "Subscribe",
1652 )
1653 }
1654 };
1655
1656 for id in source_imports.iter() {
1658 soft_assert_or_log!(
1659 read_holds.storage_holds.contains_key(id),
1660 "[{}] missing read hold for the source import {}; (in_immediate_multi_stmt_txn: {})",
1661 execution_name,
1662 id,
1663 in_immediate_multi_stmt_txn,
1664 );
1665 }
1666 for id in index_imports.iter() {
1667 soft_assert_or_log!(
1668 read_holds
1669 .compute_ids()
1670 .map(|(_instance, coll)| coll)
1671 .contains(id),
1672 "[{}] missing read hold for the index import {}; (in_immediate_multi_stmt_txn: {})",
1673 execution_name,
1674 id,
1675 in_immediate_multi_stmt_txn,
1676 );
1677 }
1678
1679 for (id, h) in read_holds.storage_holds.iter() {
1681 soft_assert_or_log!(
1682 h.since().less_equal(&as_of),
1683 "[{}] storage read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1684 execution_name,
1685 h.since(),
1686 id,
1687 as_of,
1688 determination,
1689 in_immediate_multi_stmt_txn,
1690 );
1691 }
1692 for ((instance, id), h) in read_holds.compute_holds.iter() {
1693 soft_assert_eq_or_log!(
1694 *instance,
1695 target_cluster_id,
1696 "[{}] the read hold on {} is on the wrong cluster; (in_immediate_multi_stmt_txn: {})",
1697 execution_name,
1698 id,
1699 in_immediate_multi_stmt_txn,
1700 );
1701 soft_assert_or_log!(
1702 h.since().less_equal(&as_of),
1703 "[{}] compute read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1704 execution_name,
1705 h.since(),
1706 id,
1707 as_of,
1708 determination,
1709 in_immediate_multi_stmt_txn,
1710 );
1711 }
1712 }
1713}
1714
1715enum Execution {
1717 Peek {
1718 global_lir_plan: optimize::peek::GlobalLirPlan,
1719 optimization_finished_at: EpochMillis,
1720 plan_insights_optimizer_trace: Option<OptimizerTrace>,
1721 finishing: RowSetFinishing,
1722 copy_to: Option<plan::CopyFormat>,
1723 insights_ctx: Option<Box<PlanInsightsContext>>,
1724 },
1725 Subscribe {
1726 subscribe_plan: SubscribePlan,
1727 df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1728 df_meta: DataflowMetainfo,
1729 optimization_finished_at: EpochMillis,
1730 },
1731 CopyToS3 {
1732 global_lir_plan: optimize::copy_to::GlobalLirPlan,
1733 source_ids: BTreeSet<GlobalId>,
1734 },
1735 ExplainPlan {
1736 df_meta: DataflowMetainfo,
1737 explain_ctx: ExplainPlanContext,
1738 optimizer: optimize::peek::Optimizer,
1739 insights_ctx: Option<Box<PlanInsightsContext>>,
1740 },
1741 ExplainPushdown {
1742 imports: BTreeMap<GlobalId, mz_expr::MapFilterProject>,
1743 determination: TimestampDetermination<Timestamp>,
1744 },
1745}