1use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::sync::Arc;
13use std::time::Duration;
14
15use itertools::Itertools;
16use mz_adapter_types::dyncfgs::ENABLE_FRONTEND_SUBSCRIBES;
17use mz_compute_types::ComputeInstanceId;
18use mz_compute_types::dataflows::DataflowDescription;
19use mz_controller_types::ClusterId;
20use mz_expr::{CollectionPlan, ResultSpec, RowSetFinishing};
21use mz_ore::cast::{CastFrom, CastLossy};
22use mz_ore::collections::CollectionExt;
23use mz_ore::now::EpochMillis;
24use mz_ore::task::JoinHandle;
25use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
26use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
27use mz_repr::role_id::RoleId;
28use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
29use mz_sql::ast::Raw;
30use mz_sql::catalog::CatalogCluster;
31use mz_sql::plan::Params;
32use mz_sql::plan::{
33 self, Explainee, ExplaineeStatement, Plan, QueryWhen, SelectPlan, SubscribePlan,
34};
35use mz_sql::rbac;
36use mz_sql::session::metadata::SessionMetadata;
37use mz_sql::session::vars::IsolationLevel;
38use mz_sql_parser::ast::{CopyDirection, ExplainStage, ShowStatement, Statement};
39use mz_transform::EmptyStatisticsOracle;
40use mz_transform::dataflow::DataflowMetainfo;
41use opentelemetry::trace::TraceContextExt;
42use timely::progress::Antichain;
43use tracing::{Span, debug, warn};
44use tracing_opentelemetry::OpenTelemetrySpanExt;
45
46use crate::catalog::Catalog;
47use crate::command::Command;
48use crate::coord::peek::{FastPathPlan, PeekPlan};
49use crate::coord::sequencer::{eval_copy_to_uri, statistics_oracle};
50use crate::coord::timeline::timedomain_for;
51use crate::coord::timestamp_selection::TimestampDetermination;
52use crate::coord::{
53 Coordinator, CopyToContext, ExecuteContextGuard, ExplainContext, ExplainPlanContext,
54 TargetCluster,
55};
56use crate::explain::insights::PlanInsightsContext;
57use crate::explain::optimizer_trace::OptimizerTrace;
58use crate::optimize::Optimize;
59use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
60use crate::session::{Session, TransactionOps, TransactionStatus};
61use crate::statement_logging::WatchSetCreation;
62use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
63use crate::{
64 AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, PeekClient, ReadHolds,
65 TimelineContext, TimestampContext, TimestampProvider, optimize,
66};
67use crate::{coord, metrics};
68
69impl PeekClient {
70 pub(crate) async fn try_frontend_peek(
78 &mut self,
79 portal_name: &str,
80 catalog: Option<Arc<Catalog>>,
81 session: &mut Session,
82 outer_ctx_extra: &mut Option<ExecuteContextGuard>,
83 ) -> Result<Option<ExecuteResponse>, AdapterError> {
84 if session.vars().emit_trace_id_notice() {
87 let span_context = tracing::Span::current()
88 .context()
89 .span()
90 .span_context()
91 .clone();
92 if span_context.is_valid() {
93 session.add_notice(AdapterNotice::QueryTrace {
94 trace_id: span_context.trace_id(),
95 });
96 }
97 }
98
99 let catalog = match catalog {
106 Some(c) => c,
107 None => self.catalog_snapshot("try_frontend_peek").await,
108 };
109
110 let (stmt, params, logging, lifecycle_timestamps) = {
112 if let Err(err) = Coordinator::verify_portal(&*catalog, session, portal_name) {
113 outer_ctx_extra
114 .take()
115 .and_then(|guard| guard.defuse().retire());
116 return Err(err);
117 }
118 let portal = session
119 .get_portal_unverified(portal_name)
120 .expect("called verify_portal above");
123 let params = portal.parameters.clone();
124 let stmt = portal.stmt.clone();
125 let logging = Arc::clone(&portal.logging);
126 let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
127 (stmt, params, logging, lifecycle_timestamps)
128 };
129
130 if let Some(ref stmt) = stmt {
133 match &**stmt {
134 Statement::Select(_)
135 | Statement::ExplainAnalyzeObject(_)
136 | Statement::ExplainAnalyzeCluster(_)
137 | Statement::Show(ShowStatement::ShowObjects(_))
138 | Statement::Show(ShowStatement::ShowColumns(_)) => {
139 }
144 Statement::ExplainPlan(explain_stmt) => {
145 match &explain_stmt.explainee {
150 mz_sql_parser::ast::Explainee::Select(..) => {
151 }
153 _ => {
154 debug!(
155 "Bailing out from try_frontend_peek, because EXPLAIN is not for a SELECT query"
156 );
157 return Ok(None);
158 }
159 }
160 }
161 Statement::ExplainPushdown(explain_stmt) => {
162 match &explain_stmt.explainee {
164 mz_sql_parser::ast::Explainee::Select(_, false) => {}
165 _ => {
166 debug!(
167 "Bailing out from try_frontend_peek, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is for EXPLAIN BROKEN"
168 );
169 return Ok(None);
170 }
171 }
172 }
173 Statement::Copy(copy_stmt) => {
174 match ©_stmt.direction {
175 CopyDirection::To => {
176 }
178 CopyDirection::From => {
179 debug!(
180 "Bailing out from try_frontend_peek, because COPY FROM is not supported"
181 );
182 return Ok(None);
183 }
184 }
185 }
186
187 Statement::Subscribe(_)
188 if ENABLE_FRONTEND_SUBSCRIBES.get(catalog.system_config().dyncfgs()) =>
189 {
190 }
192 _ => {
193 debug!(
194 "Bailing out from try_frontend_peek, because statement type is not supported"
195 );
196 return Ok(None);
197 }
198 }
199 }
200
201 let statement_logging_id = if outer_ctx_extra.is_none() {
204 let result = self.statement_logging_frontend.begin_statement_execution(
206 session,
207 ¶ms,
208 &logging,
209 catalog.system_config(),
210 lifecycle_timestamps,
211 );
212
213 if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result {
214 self.log_began_execution(began_execution, mseh_update, prepared_statement);
215 Some(logging_id)
216 } else {
217 None
218 }
219 } else {
220 outer_ctx_extra
227 .take()
228 .and_then(|guard| guard.defuse().retire())
229 };
230
231 let result = self
232 .try_frontend_peek_inner(session, catalog, stmt, params, statement_logging_id)
233 .await;
234
235 if let Some(logging_id) = statement_logging_id {
238 let reason = match &result {
239 Ok(Some(
241 ExecuteResponse::SendingRowsStreaming { .. }
242 | ExecuteResponse::Subscribing { .. },
243 )) => {
244 return result;
247 }
248 Ok(Some(resp @ ExecuteResponse::CopyTo { resp: inner, .. })) => {
250 match inner.as_ref() {
251 ExecuteResponse::SendingRowsStreaming { .. }
252 | ExecuteResponse::Subscribing { .. } => {
253 return result;
256 }
257 _ => resp.into(),
259 }
260 }
261 Ok(None) => {
263 soft_panic_or_log!(
264 "Bailed out from `try_frontend_peek_inner` after we already logged the beginning of statement execution."
265 );
266 self.log_ended_execution(
269 logging_id,
270 StatementEndedExecutionReason::Errored {
271 error: "Internal error: bailed out from `try_frontend_peek_inner`"
272 .to_string(),
273 },
274 );
275 return result;
276 }
277 Ok(Some(resp)) => resp.into(),
282 Err(e) => StatementEndedExecutionReason::Errored {
283 error: e.to_string(),
284 },
285 };
286
287 self.log_ended_execution(logging_id, reason);
288 }
289
290 result
291 }
292
293 async fn try_frontend_peek_inner(
296 &mut self,
297 session: &mut Session,
298 catalog: Arc<Catalog>,
299 stmt: Option<Arc<Statement<Raw>>>,
300 params: Params,
301 statement_logging_id: Option<crate::statement_logging::StatementLoggingId>,
302 ) -> Result<Option<ExecuteResponse>, AdapterError> {
303 let stmt = match stmt {
304 Some(stmt) => stmt,
305 None => {
306 debug!("try_frontend_peek_inner succeeded on an empty query");
307 return Ok(Some(ExecuteResponse::EmptyQuery));
308 }
309 };
310
311 session
312 .metrics()
313 .query_total(&[
314 metrics::session_type_label_value(session.user()),
315 metrics::statement_type_label_value(&stmt),
316 ])
317 .inc();
318
319 let conn_catalog = catalog.for_session(session);
322 let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?;
325
326 let pcx = session.pcx();
327 let plan = mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, ¶ms, &resolved_ids)?;
328
329 enum QueryPlan<'a> {
331 Select(&'a SelectPlan),
332 CopyTo(&'a SelectPlan, CopyToContext),
333 Subscribe(&'a SubscribePlan),
334 }
335
336 let (query_plan, explain_ctx) = match &plan {
337 Plan::Select(select_plan) => {
338 let explain_ctx = if session.vars().emit_plan_insights_notice() {
339 let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
340 ExplainContext::PlanInsightsNotice(optimizer_trace)
341 } else {
342 ExplainContext::None
343 };
344 (QueryPlan::Select(select_plan), explain_ctx)
345 }
346 Plan::ShowColumns(show_columns_plan) => {
347 (
349 QueryPlan::Select(&show_columns_plan.select_plan),
350 ExplainContext::None,
351 )
352 }
353 Plan::ExplainPlan(plan::ExplainPlanPlan {
354 stage,
355 format,
356 config,
357 explainee: Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc }),
358 }) => {
359 let optimizer_trace = OptimizerTrace::new(stage.paths());
361 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
362 broken: *broken,
363 config: config.clone(),
364 format: *format,
365 stage: *stage,
366 replan: None,
367 desc: Some(desc.clone()),
368 optimizer_trace,
369 });
370 (QueryPlan::Select(plan), explain_ctx)
371 }
372 Plan::CopyTo(plan::CopyToPlan {
374 select_plan,
375 desc,
376 to,
377 connection,
378 connection_id,
379 format,
380 max_file_size,
381 }) => {
382 let uri = eval_copy_to_uri(to.clone(), session, catalog.state())?;
383
384 let copy_to_ctx = CopyToContext {
386 desc: desc.clone(),
387 uri,
388 connection: connection.clone(),
389 connection_id: *connection_id,
390 format: format.clone(),
391 max_file_size: *max_file_size,
392 output_batch_count: None,
393 };
394
395 (
396 QueryPlan::CopyTo(select_plan, copy_to_ctx),
397 ExplainContext::None,
398 )
399 }
400 Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => {
401 match explainee {
403 plan::Explainee::Statement(plan::ExplaineeStatement::Select {
404 broken: false,
405 plan,
406 desc: _,
407 }) => {
408 let explain_ctx = ExplainContext::Pushdown;
409 (QueryPlan::Select(plan), explain_ctx)
410 }
411 _ => {
412 soft_panic_or_log!(
415 "unexpected EXPLAIN FILTER PUSHDOWN plan kind in frontend peek sequencing: {:?}",
416 explainee
417 );
418 debug!(
419 "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is EXPLAIN BROKEN"
420 );
421 return Ok(None);
422 }
423 }
424 }
425 Plan::SideEffectingFunc(sef_plan) => {
426 let response = self
430 .call_coordinator(|tx| Command::ExecuteSideEffectingFunc {
431 plan: sef_plan.clone(),
432 conn_id: session.conn_id().clone(),
433 current_role: session.role_metadata().current_role,
434 tx,
435 })
436 .await?;
437 return Ok(Some(response));
438 }
439 Plan::Subscribe(subscribe) => (QueryPlan::Subscribe(subscribe), ExplainContext::None),
440 _ => {
441 soft_panic_or_log!(
444 "Unexpected plan kind in frontend peek sequencing: {:?}",
445 plan
446 );
447 debug!(
448 "Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, side-effecting SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO S3"
449 );
450 return Ok(None);
451 }
452 };
453
454 let when = match query_plan {
455 QueryPlan::Select(s) => &s.when,
456 QueryPlan::CopyTo(s, _) => &s.when,
457 QueryPlan::Subscribe(s) => &s.when,
458 };
459
460 let depends_on = match query_plan {
461 QueryPlan::Select(s) => s.source.depends_on(),
462 QueryPlan::CopyTo(s, _) => s.source.depends_on(),
463 QueryPlan::Subscribe(s) => s.from.depends_on(),
464 };
465
466 let contains_temporal = match query_plan {
467 QueryPlan::Select(s) => s.source.contains_temporal(),
468 QueryPlan::CopyTo(s, _) => s.source.contains_temporal(),
469 QueryPlan::Subscribe(s) => Ok(s.from.contains_temporal()),
470 };
471
472 assert!(plan.allowed_in_read_only());
476
477 let (cluster, target_cluster_id, target_cluster_name) = {
478 let target_cluster = match session.transaction().cluster() {
479 Some(cluster_id) => TargetCluster::Transaction(cluster_id),
481 None => coord::catalog_serving::auto_run_on_catalog_server(
483 &conn_catalog,
484 session,
485 &plan,
486 ),
487 };
488 let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
489 (cluster, cluster.id, &cluster.name)
490 };
491
492 if let Some(logging_id) = &statement_logging_id {
494 self.log_set_cluster(*logging_id, target_cluster_id);
495 }
496
497 coord::catalog_serving::check_cluster_restrictions(
498 target_cluster_name.as_str(),
499 &conn_catalog,
500 &plan,
501 )?;
502
503 rbac::check_plan(
504 &conn_catalog,
505 None::<fn(u32) -> Option<RoleId>>,
508 session,
509 &plan,
510 Some(target_cluster_id),
511 &resolved_ids,
512 )?;
513
514 if let Some((_, wait_future)) =
515 coord::appends::waiting_on_startup_appends(&*catalog, session, &plan)
516 {
517 wait_future.await;
518 }
519
520 let max_query_result_size = Some(session.vars().max_query_result_size());
521
522 let compute_instance_snapshot =
527 ComputeInstanceSnapshot::new_without_collections(cluster.id());
528
529 let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config())
530 .override_from(&catalog.get_cluster(cluster.id()).config.features())
531 .override_from(&explain_ctx);
532
533 if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
534 return Err(AdapterError::NoClusterReplicasAvailable {
535 name: cluster.name.clone(),
536 is_managed: cluster.is_managed(),
537 });
538 }
539
540 let (_, view_id) = self.transient_id_gen.allocate_id();
541 let (_, index_id) = self.transient_id_gen.allocate_id();
542
543 let target_replica_name = session.vars().cluster_replica();
544 let mut target_replica = target_replica_name
545 .map(|name| {
546 cluster
547 .replica_id(name)
548 .ok_or(AdapterError::UnknownClusterReplica {
549 cluster_name: cluster.name.clone(),
550 replica_name: name.to_string(),
551 })
552 })
553 .transpose()?;
554
555 let source_ids = depends_on;
556 let mut timeline_context = catalog.validate_timeline_context(source_ids.iter().copied())?;
560 if matches!(timeline_context, TimelineContext::TimestampIndependent) && contains_temporal? {
561 timeline_context = TimelineContext::TimestampDependent;
565 }
566
567 let notices = coord::sequencer::check_log_reads(
568 &catalog,
569 cluster,
570 &source_ids,
571 &mut target_replica,
572 session.vars(),
573 )?;
574 session.add_notices(notices);
575
576 let isolation_level = session.vars().transaction_isolation().clone();
579 let timeline = Coordinator::get_timeline(&timeline_context);
580 let needs_linearized_read_ts =
581 Coordinator::needs_linearized_read_ts(&isolation_level, when);
582
583 let oracle_read_ts = match timeline {
584 Some(timeline) if needs_linearized_read_ts => {
585 let oracle = self.ensure_oracle(timeline).await?;
586 let oracle_read_ts = oracle.read_ts().await;
587 Some(oracle_read_ts)
588 }
589 Some(_) | None => None,
590 };
591
592 let vars = session.vars();
595 let real_time_recency_ts: Option<Timestamp> = if vars.real_time_recency()
596 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
597 && !session.contains_read_timestamp()
598 {
599 self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp {
601 source_ids: source_ids.clone(),
602 real_time_recency_timeout: *vars.real_time_recency_timeout(),
603 tx,
604 })
605 .await?
606 } else {
607 None
608 };
609
610 let dataflow_builder =
613 DataflowBuilder::new(catalog.state(), compute_instance_snapshot.clone());
614 let input_id_bundle = dataflow_builder.sufficient_collections(source_ids.clone());
615
616 let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when)
629 && !matches!(query_plan, QueryPlan::Subscribe { .. });
630
631 let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
633 Some(
636 determination @ TimestampDetermination {
637 timestamp_context: TimestampContext::TimelineTimestamp { .. },
638 ..
639 },
640 ) if in_immediate_multi_stmt_txn => {
641 let txn_read_holds_opt = self
649 .call_coordinator(|tx| Command::GetTransactionReadHoldsBundle {
650 conn_id: session.conn_id().clone(),
651 tx,
652 })
653 .await;
654
655 if let Some(txn_read_holds) = txn_read_holds_opt {
656 let allowed_id_bundle = txn_read_holds.id_bundle();
657 let outside = input_id_bundle.difference(&allowed_id_bundle);
658
659 if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
661 let valid_names =
662 allowed_id_bundle.resolve_names(&*catalog, session.conn_id());
663 let invalid_names = outside.resolve_names(&*catalog, session.conn_id());
664 return Err(AdapterError::RelationOutsideTimeDomain {
665 relations: invalid_names,
666 names: valid_names,
667 });
668 }
669
670 let read_holds = txn_read_holds.subset(&input_id_bundle);
672
673 (determination, read_holds)
674 } else {
675 return Err(AdapterError::Internal(
680 "Missing transaction read holds for multi-statement transaction"
681 .to_string(),
682 ));
683 }
684 }
685 _ => {
686 let timedomain_bundle;
693 let determine_bundle = if in_immediate_multi_stmt_txn {
694 timedomain_bundle = timedomain_for(
698 &*catalog,
699 &dataflow_builder,
700 &source_ids,
701 &timeline_context,
702 session.conn_id(),
703 target_cluster_id,
704 )?;
705 &timedomain_bundle
706 } else {
707 &input_id_bundle
709 };
710 let (determination, read_holds) = self
711 .frontend_determine_timestamp(
712 session,
713 determine_bundle,
714 when,
715 target_cluster_id,
716 &timeline_context,
717 oracle_read_ts,
718 real_time_recency_ts,
719 )
720 .await?;
721
722 if in_immediate_multi_stmt_txn {
726 self.call_coordinator(|tx| Command::StoreTransactionReadHolds {
727 conn_id: session.conn_id().clone(),
728 read_holds: read_holds.clone(),
729 tx,
730 })
731 .await;
732 }
733
734 (determination, read_holds)
735 }
736 };
737
738 {
739 for id in input_id_bundle.iter() {
741 let s = read_holds.storage_holds.contains_key(&id);
742 let c = read_holds
743 .compute_ids()
744 .map(|(_instance, coll)| coll)
745 .contains(&id);
746 soft_assert_or_log!(
747 s || c,
748 "missing read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
749 id,
750 in_immediate_multi_stmt_txn,
751 );
752 }
753
754 for id in input_id_bundle.storage_ids.iter() {
757 soft_assert_or_log!(
758 read_holds.storage_holds.contains_key(id),
759 "missing storage read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
760 id,
761 in_immediate_multi_stmt_txn,
762 );
763 }
764 for id in input_id_bundle
765 .compute_ids
766 .iter()
767 .flat_map(|(_instance, colls)| colls)
768 {
769 soft_assert_or_log!(
770 read_holds
771 .compute_ids()
772 .map(|(_instance, coll)| coll)
773 .contains(id),
774 "missing compute read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
775 id,
776 in_immediate_multi_stmt_txn,
777 );
778 }
779 }
780
781 let requires_linearization = (&explain_ctx).into();
794 let mut transaction_determination = determination.clone();
795 match query_plan {
796 QueryPlan::Subscribe { .. } => {
797 if when.is_transactional() {
798 session.add_transaction_ops(TransactionOps::Subscribe)?;
799 }
800 }
801 QueryPlan::Select(..) | QueryPlan::CopyTo(..) => {
802 if when.is_transactional() {
803 session.add_transaction_ops(TransactionOps::Peeks {
804 determination: transaction_determination,
805 cluster_id: target_cluster_id,
806 requires_linearization,
807 })?;
808 } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
809 transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
811 session.add_transaction_ops(TransactionOps::Peeks {
812 determination: transaction_determination,
813 cluster_id: target_cluster_id,
814 requires_linearization,
815 })?;
816 }
817 }
818 }
819
820 let stats = statistics_oracle(
823 session,
824 &source_ids,
825 &determination.timestamp_context.antichain(),
826 true,
827 catalog.system_config(),
828 &*self.storage_collections,
829 )
830 .await
831 .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
832
833 let timestamp_context = determination.timestamp_context.clone();
836 let session_meta = session.meta();
837 let now = catalog.config().now.clone();
838 let target_cluster_name = target_cluster_name.clone();
839 let needs_plan_insights = explain_ctx.needs_plan_insights();
840 let determination_for_pushdown = if matches!(explain_ctx, ExplainContext::Pushdown) {
841 Some(determination.clone())
844 } else {
845 None
846 };
847
848 let span = Span::current();
849
850 let catalog_for_insights = if needs_plan_insights {
852 Some(Arc::clone(&catalog))
853 } else {
854 None
855 };
856 let mut compute_instances = BTreeMap::new();
857 if needs_plan_insights {
858 for user_cluster in catalog.user_clusters() {
859 let snapshot = ComputeInstanceSnapshot::new_without_collections(user_cluster.id);
860 compute_instances.insert(user_cluster.name.clone(), snapshot);
861 }
862 }
863
864 let source_ids_for_closure = source_ids.clone();
865
866 let optimization_future: JoinHandle<Result<_, AdapterError>> = match query_plan {
867 QueryPlan::CopyTo(select_plan, mut copy_to_ctx) => {
868 let raw_expr = select_plan.source.clone();
869
870 let worker_counts = cluster.replicas().map(|r| {
872 let loc = &r.config.location;
873 loc.workers().unwrap_or_else(|| loc.num_processes())
874 });
875 let max_worker_count = match worker_counts.max() {
876 Some(count) => u64::cast_from(count),
877 None => {
878 return Err(AdapterError::NoClusterReplicasAvailable {
879 name: cluster.name.clone(),
880 is_managed: cluster.is_managed(),
881 });
882 }
883 };
884 copy_to_ctx.output_batch_count = Some(max_worker_count);
885
886 let mut optimizer = optimize::copy_to::Optimizer::new(
887 Arc::clone(&catalog),
888 compute_instance_snapshot,
889 view_id,
890 copy_to_ctx,
891 optimizer_config,
892 self.optimizer_metrics.clone(),
893 );
894
895 mz_ore::task::spawn_blocking(
896 || "optimize copy-to",
897 move || {
898 span.in_scope(|| {
899 let _dispatch_guard = explain_ctx.dispatch_guard();
900
901 let local_mir_plan =
904 optimizer.catch_unwind_optimize(raw_expr.clone())?;
905 let local_mir_plan = local_mir_plan.resolve(
907 timestamp_context.clone(),
908 &session_meta,
909 stats,
910 );
911 let global_lir_plan =
913 optimizer.catch_unwind_optimize(local_mir_plan)?;
914 Ok(Execution::CopyToS3 {
915 global_lir_plan,
916 source_ids: source_ids_for_closure,
917 })
918 })
919 },
920 )
921 }
922 QueryPlan::Select(select_plan) => {
923 let select_plan = select_plan.clone();
924 let raw_expr = select_plan.source.clone();
925
926 let mut optimizer = optimize::peek::Optimizer::new(
928 Arc::clone(&catalog),
929 compute_instance_snapshot,
930 select_plan.finishing.clone(),
931 view_id,
932 index_id,
933 optimizer_config,
934 self.optimizer_metrics.clone(),
935 );
936
937 mz_ore::task::spawn_blocking(
938 || "optimize peek",
939 move || {
940 span.in_scope(|| {
941 let _dispatch_guard = explain_ctx.dispatch_guard();
942
943 let pipeline = || {
950 let local_mir_plan =
951 optimizer.catch_unwind_optimize(raw_expr.clone())?;
952 let local_mir_plan = local_mir_plan.resolve(
954 timestamp_context.clone(),
955 &session_meta,
956 stats,
957 );
958 let global_lir_plan =
960 optimizer.catch_unwind_optimize(local_mir_plan)?;
961 Ok::<_, AdapterError>(global_lir_plan)
962 };
963
964 let global_lir_plan_result = pipeline();
965 let optimization_finished_at = now();
966
967 let create_insights_ctx =
968 |optimizer: &optimize::peek::Optimizer,
969 is_notice: bool|
970 -> Option<Box<PlanInsightsContext>> {
971 if !needs_plan_insights {
972 return None;
973 }
974
975 let catalog = catalog_for_insights.as_ref()?;
976
977 let enable_re_optimize = if needs_plan_insights {
978 let dyncfgs = catalog.system_config().dyncfgs();
986 let opt_limit = mz_adapter_types::dyncfgs
987 ::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
988 .get(dyncfgs);
989 !(is_notice && optimizer.duration() > opt_limit)
990 } else {
991 false
992 };
993
994 Some(Box::new(PlanInsightsContext {
995 stmt: select_plan
996 .select
997 .as_deref()
998 .map(Clone::clone)
999 .map(Statement::Select),
1000 raw_expr: raw_expr.clone(),
1001 catalog: Arc::clone(catalog),
1002 compute_instances,
1003 target_instance: target_cluster_name,
1004 metrics: optimizer.metrics().clone(),
1005 finishing: optimizer.finishing().clone(),
1006 optimizer_config: optimizer.config().clone(),
1007 session: session_meta,
1008 timestamp_context,
1009 view_id: optimizer.select_id(),
1010 index_id: optimizer.index_id(),
1011 enable_re_optimize,
1012 }))
1013 };
1014
1015 let global_lir_plan = match global_lir_plan_result {
1016 Ok(plan) => plan,
1017 Err(err) => {
1018 let result = if let ExplainContext::Plan(explain_ctx) =
1019 explain_ctx
1020 && explain_ctx.broken
1021 {
1022 tracing::error!(
1024 "error while handling EXPLAIN statement: {}",
1025 err
1026 );
1027 Ok(Execution::ExplainPlan {
1028 df_meta: Default::default(),
1029 explain_ctx,
1030 optimizer,
1031 insights_ctx: None,
1032 })
1033 } else {
1034 Err(err)
1035 };
1036 return result;
1037 }
1038 };
1039
1040 match explain_ctx {
1041 ExplainContext::Plan(explain_ctx) => {
1042 let (_, df_meta, _) = global_lir_plan.unapply();
1043 let insights_ctx = create_insights_ctx(&optimizer, false);
1044 Ok(Execution::ExplainPlan {
1045 df_meta,
1046 explain_ctx,
1047 optimizer,
1048 insights_ctx,
1049 })
1050 }
1051 ExplainContext::None => Ok(Execution::Peek {
1052 global_lir_plan,
1053 optimization_finished_at,
1054 plan_insights_optimizer_trace: None,
1055 finishing: select_plan.finishing,
1056 copy_to: select_plan.copy_to,
1057 insights_ctx: None,
1058 }),
1059 ExplainContext::PlanInsightsNotice(optimizer_trace) => {
1060 let insights_ctx = create_insights_ctx(&optimizer, true);
1061 Ok(Execution::Peek {
1062 global_lir_plan,
1063 optimization_finished_at,
1064 plan_insights_optimizer_trace: Some(optimizer_trace),
1065 finishing: select_plan.finishing,
1066 copy_to: select_plan.copy_to,
1067 insights_ctx,
1068 })
1069 }
1070 ExplainContext::Pushdown => {
1071 let (plan, _, _) = global_lir_plan.unapply();
1072 let imports = match plan {
1073 PeekPlan::SlowPath(plan) => plan
1074 .desc
1075 .source_imports
1076 .into_iter()
1077 .filter_map(|(id, import)| {
1078 import.desc.arguments.operators.map(|mfp| (id, mfp))
1079 })
1080 .collect(),
1081 PeekPlan::FastPath(_) => {
1082 std::collections::BTreeMap::default()
1083 }
1084 };
1085 Ok(Execution::ExplainPushdown {
1086 imports,
1087 determination: determination_for_pushdown
1088 .expect("it's present for the ExplainPushdown case"),
1089 })
1090 }
1091 }
1092 })
1093 },
1094 )
1095 }
1096 QueryPlan::Subscribe(plan) => {
1097 let plan = plan.clone();
1098 let catalog: Arc<Catalog> = Arc::clone(&catalog);
1099 let debug_name = format!("subscribe-{}", index_id);
1100 let mut optimizer = optimize::subscribe::Optimizer::new(
1101 catalog,
1102 compute_instance_snapshot.clone(),
1103 view_id,
1104 index_id,
1105 plan.with_snapshot,
1106 plan.up_to,
1107 debug_name,
1108 optimizer_config,
1109 self.optimizer_metrics.clone(),
1110 );
1111 mz_ore::task::spawn_blocking(
1112 || "optimize subscribe",
1113 move || {
1114 span.in_scope(|| {
1115 let _dispatch_guard = explain_ctx.dispatch_guard();
1116
1117 let global_mir_plan = optimizer.catch_unwind_optimize(plan.clone())?;
1118 let as_of = timestamp_context.timestamp_or_default();
1119
1120 if let Some(up_to) = optimizer.up_to() {
1121 if as_of > up_to {
1122 return Err(AdapterError::AbsurdSubscribeBounds {
1123 as_of,
1124 up_to,
1125 });
1126 }
1127 }
1128 let local_mir_plan =
1129 global_mir_plan.resolve(Antichain::from_elem(as_of));
1130
1131 let global_lir_plan =
1132 optimizer.catch_unwind_optimize(local_mir_plan)?;
1133 let optimization_finished_at = now();
1134
1135 let (df_desc, df_meta) = global_lir_plan.unapply();
1136 Ok(Execution::Subscribe {
1137 subscribe_plan: plan,
1138 df_desc,
1139 df_meta,
1140 optimization_finished_at,
1141 })
1142 })
1143 },
1144 )
1145 }
1146 };
1147
1148 let mut optimization_timeout = *session.vars().statement_timeout();
1149 if optimization_timeout == Duration::ZERO {
1151 optimization_timeout = Duration::MAX;
1152 }
1153 let optimization_result =
1154 match tokio::time::timeout(optimization_timeout, optimization_future).await {
1159 Ok(Ok(result)) => result,
1160 Ok(Err(AdapterError::Optimizer(err))) => {
1161 return Err(AdapterError::Internal(format!(
1162 "internal error in optimizer: {}",
1163 err
1164 )));
1165 }
1166 Ok(Err(err)) => {
1167 return Err(err);
1168 }
1169 Err(_elapsed) => {
1170 warn!("optimize peek timed out after {:?}", optimization_timeout);
1171 return Err(AdapterError::StatementTimeout);
1172 }
1173 };
1174
1175 if let Some(logging_id) = &statement_logging_id {
1177 self.log_lifecycle_event(*logging_id, StatementLifecycleEvent::OptimizationFinished);
1178 }
1179
1180 Self::assert_read_holds_correct(
1182 &read_holds,
1183 &optimization_result,
1184 &determination,
1185 target_cluster_id,
1186 in_immediate_multi_stmt_txn,
1187 );
1188
1189 match optimization_result {
1191 Execution::ExplainPlan {
1192 df_meta,
1193 explain_ctx,
1194 optimizer,
1195 insights_ctx,
1196 } => {
1197 let rows = coord::sequencer::explain_plan_inner(
1198 session,
1199 &catalog,
1200 df_meta,
1201 explain_ctx,
1202 optimizer,
1203 insights_ctx,
1204 )
1205 .await?;
1206
1207 Ok(Some(ExecuteResponse::SendingRowsImmediate {
1208 rows: Box::new(rows.into_row_iter()),
1209 }))
1210 }
1211 Execution::ExplainPushdown {
1212 imports,
1213 determination,
1214 } => {
1215 let as_of = determination.timestamp_context.antichain();
1218 let mz_now = determination
1219 .timestamp_context
1220 .timestamp()
1221 .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1222 .unwrap_or_else(ResultSpec::value_all);
1223
1224 Ok(Some(
1225 coord::sequencer::explain_pushdown_future_inner(
1226 session,
1227 &*catalog,
1228 &self.storage_collections,
1229 as_of,
1230 mz_now,
1231 imports,
1232 )
1233 .await
1234 .await?,
1235 ))
1236 }
1237 Execution::Peek {
1238 global_lir_plan,
1239 optimization_finished_at: _optimization_finished_at,
1240 plan_insights_optimizer_trace,
1241 finishing,
1242 copy_to,
1243 insights_ctx,
1244 } => {
1245 let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
1250
1251 coord::sequencer::emit_optimizer_notices(
1252 &*catalog,
1253 session,
1254 &df_meta.optimizer_notices,
1255 );
1256
1257 if let Some(trace) = plan_insights_optimizer_trace {
1259 let target_cluster = catalog.get_cluster(target_cluster_id);
1260 let features = OptimizerFeatures::from(catalog.system_config())
1261 .override_from(&target_cluster.config.features());
1262 let insights = trace
1263 .into_plan_insights(
1264 &features,
1265 &catalog.for_session(session),
1266 Some(finishing.clone()),
1267 Some(target_cluster),
1268 df_meta.clone(),
1269 insights_ctx,
1270 )
1271 .await?;
1272 session.add_notice(AdapterNotice::PlanInsights(insights));
1273 }
1274
1275 let watch_set = statement_logging_id.map(|logging_id| {
1278 WatchSetCreation::new(
1279 logging_id,
1280 catalog.state(),
1281 &input_id_bundle,
1282 determination.timestamp_context.timestamp_or_default(),
1283 )
1284 });
1285
1286 let max_result_size = catalog.system_config().max_result_size();
1287
1288 let determination_for_notice = if session.vars().emit_timestamp_notice() {
1291 Some(determination.clone())
1292 } else {
1293 None
1294 };
1295
1296 let response = match peek_plan {
1297 PeekPlan::FastPath(fast_path_plan) => {
1298 if let Some(logging_id) = &statement_logging_id {
1299 if !matches!(fast_path_plan, FastPathPlan::Constant(..)) {
1313 self.log_set_timestamp(
1314 *logging_id,
1315 determination.timestamp_context.timestamp_or_default(),
1316 );
1317 }
1318 }
1319
1320 let row_set_finishing_seconds =
1321 session.metrics().row_set_finishing_seconds().clone();
1322
1323 let peek_stash_read_batch_size_bytes =
1324 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
1325 .get(catalog.system_config().dyncfgs());
1326 let peek_stash_read_memory_budget_bytes =
1327 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
1328 .get(catalog.system_config().dyncfgs());
1329
1330 self.implement_fast_path_peek_plan(
1331 fast_path_plan,
1332 determination.timestamp_context.timestamp_or_default(),
1333 finishing,
1334 target_cluster_id,
1335 target_replica,
1336 typ,
1337 max_result_size,
1338 max_query_result_size,
1339 row_set_finishing_seconds,
1340 read_holds,
1341 peek_stash_read_batch_size_bytes,
1342 peek_stash_read_memory_budget_bytes,
1343 session.conn_id().clone(),
1344 source_ids,
1345 watch_set,
1346 )
1347 .await?
1348 }
1349 PeekPlan::SlowPath(dataflow_plan) => {
1350 if let Some(logging_id) = &statement_logging_id {
1351 self.log_set_transient_index_id(*logging_id, dataflow_plan.id);
1352 }
1353
1354 self.call_coordinator(|tx| Command::ExecuteSlowPathPeek {
1355 dataflow_plan: Box::new(dataflow_plan),
1356 determination,
1357 finishing,
1358 compute_instance: target_cluster_id,
1359 target_replica,
1360 intermediate_result_type: typ,
1361 source_ids,
1362 conn_id: session.conn_id().clone(),
1363 max_result_size,
1364 max_query_result_size,
1365 watch_set,
1366 tx,
1367 })
1368 .await?
1369 }
1370 };
1371
1372 if let Some(determination) = determination_for_notice {
1374 let explanation = self
1375 .call_coordinator(|tx| Command::ExplainTimestamp {
1376 conn_id: session.conn_id().clone(),
1377 session_wall_time: session.pcx().wall_time,
1378 cluster_id: target_cluster_id,
1379 id_bundle: input_id_bundle.clone(),
1380 determination,
1381 tx,
1382 })
1383 .await;
1384 session.add_notice(AdapterNotice::QueryTimestamp { explanation });
1385 }
1386
1387 Ok(Some(match copy_to {
1388 None => response,
1389 Some(format) => ExecuteResponse::CopyTo {
1391 format,
1392 resp: Box::new(response),
1393 },
1394 }))
1395 }
1396 Execution::Subscribe {
1397 subscribe_plan,
1398 df_desc,
1399 df_meta,
1400 optimization_finished_at: _optimization_finished_at,
1401 } => {
1402 if df_desc.as_of.as_ref().expect("as of set") == &df_desc.until {
1403 session.add_notice(AdapterNotice::EqualSubscribeBounds {
1404 bound: *df_desc.until.as_option().expect("as of set"),
1405 });
1406 }
1407 coord::sequencer::emit_optimizer_notices(
1408 &*catalog,
1409 session,
1410 &df_meta.optimizer_notices,
1411 );
1412
1413 let response = self
1414 .call_coordinator(|tx| Command::ExecuteSubscribe {
1415 df_desc,
1416 dependency_ids: subscribe_plan.from.depends_on(),
1417 cluster_id: target_cluster_id,
1418 replica_id: target_replica,
1419 conn_id: session.conn_id().clone(),
1420 session_uuid: session.uuid(),
1421 read_holds,
1422 plan: subscribe_plan,
1423 statement_logging_id,
1424 tx,
1425 })
1426 .await?;
1427 Ok(Some(response))
1428 }
1429 Execution::CopyToS3 {
1430 global_lir_plan,
1431 source_ids,
1432 } => {
1433 let (df_desc, df_meta) = global_lir_plan.unapply();
1434
1435 coord::sequencer::emit_optimizer_notices(
1436 &*catalog,
1437 session,
1438 &df_meta.optimizer_notices,
1439 );
1440
1441 let sink_id = df_desc.sink_id();
1443 let sinks = &df_desc.sink_exports;
1444 if sinks.len() != 1 {
1445 return Err(AdapterError::Internal(
1446 "expected exactly one copy to s3 sink".into(),
1447 ));
1448 }
1449 let (_, sink_desc) = sinks
1450 .first_key_value()
1451 .expect("known to be exactly one copy to s3 sink");
1452 let s3_sink_connection = match &sink_desc.connection {
1453 mz_compute_types::sinks::ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1454 conn.clone()
1455 }
1456 _ => {
1457 return Err(AdapterError::Internal(
1458 "expected copy to s3 oneshot sink".into(),
1459 ));
1460 }
1461 };
1462
1463 self.call_coordinator(|tx| Command::CopyToPreflight {
1466 s3_sink_connection,
1467 sink_id,
1468 tx,
1469 })
1470 .await?;
1471
1472 let watch_set = statement_logging_id.map(|logging_id| {
1474 WatchSetCreation::new(
1475 logging_id,
1476 catalog.state(),
1477 &input_id_bundle,
1478 determination.timestamp_context.timestamp_or_default(),
1479 )
1480 });
1481
1482 let response = self
1483 .call_coordinator(|tx| Command::ExecuteCopyTo {
1484 df_desc: Box::new(df_desc),
1485 compute_instance: target_cluster_id,
1486 target_replica,
1487 source_ids,
1488 conn_id: session.conn_id().clone(),
1489 watch_set,
1490 tx,
1491 })
1492 .await?;
1493
1494 Ok(Some(response))
1495 }
1496 }
1497 }
1498
1499 pub(crate) async fn frontend_determine_timestamp(
1506 &mut self,
1507 session: &Session,
1508 id_bundle: &CollectionIdBundle,
1509 when: &QueryWhen,
1510 compute_instance: ComputeInstanceId,
1511 timeline_context: &TimelineContext,
1512 oracle_read_ts: Option<Timestamp>,
1513 real_time_recency_ts: Option<Timestamp>,
1514 ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> {
1515 let isolation_level = session.vars().transaction_isolation();
1518
1519 let (read_holds, upper) = self
1520 .acquire_read_holds_and_least_valid_write(id_bundle)
1521 .await
1522 .map_err(|err| {
1523 AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1524 err,
1525 compute_instance,
1526 )
1527 })?;
1528 let (det, read_holds) = <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1529 session,
1530 id_bundle,
1531 when,
1532 timeline_context,
1533 oracle_read_ts,
1534 real_time_recency_ts,
1535 isolation_level,
1536 read_holds,
1537 upper.clone(),
1538 )?;
1539
1540 session
1541 .metrics()
1542 .determine_timestamp(&[
1543 match det.respond_immediately() {
1544 true => "true",
1545 false => "false",
1546 },
1547 isolation_level.as_str(),
1548 &compute_instance.to_string(),
1549 ])
1550 .inc();
1551 if !det.respond_immediately()
1552 && isolation_level == &IsolationLevel::StrictSerializable
1553 && real_time_recency_ts.is_none()
1554 {
1555 if let Some(strict) = det.timestamp_context.timestamp() {
1557 let (serializable_det, _tmp_read_holds) =
1558 <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1559 session,
1560 id_bundle,
1561 when,
1562 timeline_context,
1563 oracle_read_ts,
1564 real_time_recency_ts,
1565 &IsolationLevel::Serializable,
1566 read_holds.clone(),
1567 upper,
1568 )?;
1569 if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
1570 session
1571 .metrics()
1572 .timestamp_difference_for_strict_serializable_ms(&[compute_instance
1573 .to_string()
1574 .as_ref()])
1575 .observe(f64::cast_lossy(u64::from(
1576 strict.saturating_sub(*serializable),
1577 )));
1578 }
1579 }
1580 }
1581
1582 Ok((det, read_holds))
1583 }
1584
1585 fn assert_read_holds_correct(
1586 read_holds: &ReadHolds,
1587 execution: &Execution,
1588 determination: &TimestampDetermination,
1589 target_cluster_id: ClusterId,
1590 in_immediate_multi_stmt_txn: bool,
1591 ) {
1592 let (source_imports, index_imports, as_of, execution_name): (
1594 Vec<GlobalId>,
1595 Vec<GlobalId>,
1596 Timestamp,
1597 &str,
1598 ) = match execution {
1599 Execution::Peek {
1600 global_lir_plan, ..
1601 } => match global_lir_plan.peek_plan() {
1602 PeekPlan::FastPath(fast_path_plan) => {
1603 let (sources, indexes) = match fast_path_plan {
1604 FastPathPlan::Constant(..) => (vec![], vec![]),
1605 FastPathPlan::PeekExisting(_coll_id, idx_id, ..) => (vec![], vec![*idx_id]),
1606 FastPathPlan::PeekPersist(global_id, ..) => (vec![*global_id], vec![]),
1607 };
1608 (
1609 sources,
1610 indexes,
1611 determination.timestamp_context.timestamp_or_default(),
1612 "FastPath",
1613 )
1614 }
1615 PeekPlan::SlowPath(dataflow_plan) => {
1616 let as_of = dataflow_plan
1617 .desc
1618 .as_of
1619 .clone()
1620 .expect("dataflow has an as_of")
1621 .into_element();
1622 (
1623 dataflow_plan.desc.source_imports.keys().cloned().collect(),
1624 dataflow_plan.desc.index_imports.keys().cloned().collect(),
1625 as_of,
1626 "SlowPath",
1627 )
1628 }
1629 },
1630 Execution::CopyToS3 {
1631 global_lir_plan, ..
1632 } => {
1633 let df_desc = global_lir_plan.df_desc();
1634 let as_of = df_desc
1635 .as_of
1636 .clone()
1637 .expect("dataflow has an as_of")
1638 .into_element();
1639 (
1640 df_desc.source_imports.keys().cloned().collect(),
1641 df_desc.index_imports.keys().cloned().collect(),
1642 as_of,
1643 "CopyToS3",
1644 )
1645 }
1646 Execution::ExplainPlan { .. } | Execution::ExplainPushdown { .. } => {
1647 return;
1649 }
1650 Execution::Subscribe { df_desc, .. } => {
1651 let as_of = df_desc
1652 .as_of
1653 .clone()
1654 .expect("dataflow has an as_of")
1655 .into_element();
1656 (
1657 df_desc.source_imports.keys().cloned().collect(),
1658 df_desc.index_imports.keys().cloned().collect(),
1659 as_of,
1660 "Subscribe",
1661 )
1662 }
1663 };
1664
1665 for id in source_imports.iter() {
1667 soft_assert_or_log!(
1668 read_holds.storage_holds.contains_key(id),
1669 "[{}] missing read hold for the source import {}; (in_immediate_multi_stmt_txn: {})",
1670 execution_name,
1671 id,
1672 in_immediate_multi_stmt_txn,
1673 );
1674 }
1675 for id in index_imports.iter() {
1676 soft_assert_or_log!(
1677 read_holds
1678 .compute_ids()
1679 .map(|(_instance, coll)| coll)
1680 .contains(id),
1681 "[{}] missing read hold for the index import {}; (in_immediate_multi_stmt_txn: {})",
1682 execution_name,
1683 id,
1684 in_immediate_multi_stmt_txn,
1685 );
1686 }
1687
1688 for (id, h) in read_holds.storage_holds.iter() {
1690 soft_assert_or_log!(
1691 h.since().less_equal(&as_of),
1692 "[{}] storage read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1693 execution_name,
1694 h.since(),
1695 id,
1696 as_of,
1697 determination,
1698 in_immediate_multi_stmt_txn,
1699 );
1700 }
1701 for ((instance, id), h) in read_holds.compute_holds.iter() {
1702 soft_assert_eq_or_log!(
1703 *instance,
1704 target_cluster_id,
1705 "[{}] the read hold on {} is on the wrong cluster; (in_immediate_multi_stmt_txn: {})",
1706 execution_name,
1707 id,
1708 in_immediate_multi_stmt_txn,
1709 );
1710 soft_assert_or_log!(
1711 h.since().less_equal(&as_of),
1712 "[{}] compute read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1713 execution_name,
1714 h.since(),
1715 id,
1716 as_of,
1717 determination,
1718 in_immediate_multi_stmt_txn,
1719 );
1720 }
1721 }
1722}
1723
1724enum Execution {
1726 Peek {
1727 global_lir_plan: optimize::peek::GlobalLirPlan,
1728 optimization_finished_at: EpochMillis,
1729 plan_insights_optimizer_trace: Option<OptimizerTrace>,
1730 finishing: RowSetFinishing,
1731 copy_to: Option<plan::CopyFormat>,
1732 insights_ctx: Option<Box<PlanInsightsContext>>,
1733 },
1734 Subscribe {
1735 subscribe_plan: SubscribePlan,
1736 df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1737 df_meta: DataflowMetainfo,
1738 optimization_finished_at: EpochMillis,
1739 },
1740 CopyToS3 {
1741 global_lir_plan: optimize::copy_to::GlobalLirPlan,
1742 source_ids: BTreeSet<GlobalId>,
1743 },
1744 ExplainPlan {
1745 df_meta: DataflowMetainfo,
1746 explain_ctx: ExplainPlanContext,
1747 optimizer: optimize::peek::Optimizer,
1748 insights_ctx: Option<Box<PlanInsightsContext>>,
1749 },
1750 ExplainPushdown {
1751 imports: BTreeMap<GlobalId, mz_expr::MapFilterProject>,
1752 determination: TimestampDetermination,
1753 },
1754}