1use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::sync::Arc;
13use std::time::Duration;
14
15use itertools::Itertools;
16use mz_adapter_types::dyncfgs::ENABLE_FRONTEND_SUBSCRIBES;
17use mz_compute_types::ComputeInstanceId;
18use mz_compute_types::dataflows::DataflowDescription;
19use mz_controller_types::ClusterId;
20use mz_expr::{CollectionPlan, ResultSpec, RowSetFinishing};
21use mz_ore::cast::{CastFrom, CastLossy};
22use mz_ore::collections::CollectionExt;
23use mz_ore::now::EpochMillis;
24use mz_ore::task::JoinHandle;
25use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
26use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
27use mz_repr::role_id::RoleId;
28use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
29use mz_sql::ast::Raw;
30use mz_sql::catalog::CatalogCluster;
31use mz_sql::plan::Params;
32use mz_sql::plan::{
33 self, Explainee, ExplaineeStatement, Plan, QueryWhen, SelectPlan, SubscribePlan,
34};
35use mz_sql::rbac;
36use mz_sql::session::metadata::SessionMetadata;
37use mz_sql::session::vars::IsolationLevel;
38use mz_sql_parser::ast::{CopyDirection, ExplainStage, ShowStatement, Statement};
39use mz_transform::EmptyStatisticsOracle;
40use mz_transform::dataflow::DataflowMetainfo;
41use opentelemetry::trace::TraceContextExt;
42use timely::progress::Antichain;
43use tracing::{Span, debug, warn};
44use tracing_opentelemetry::OpenTelemetrySpanExt;
45
46use crate::catalog::Catalog;
47use crate::command::Command;
48use crate::coord::peek::{FastPathPlan, PeekPlan};
49use crate::coord::sequencer::{eval_copy_to_uri, statistics_oracle};
50use crate::coord::timeline::timedomain_for;
51use crate::coord::timestamp_selection::TimestampDetermination;
52use crate::coord::{
53 Coordinator, CopyToContext, ExecuteContextGuard, ExplainContext, ExplainPlanContext,
54 TargetCluster,
55};
56use crate::explain::insights::PlanInsightsContext;
57use crate::explain::optimizer_trace::OptimizerTrace;
58use crate::optimize::Optimize;
59use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
60use crate::session::{Session, TransactionOps, TransactionStatus};
61use crate::statement_logging::WatchSetCreation;
62use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
63use crate::{
64 AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, PeekClient, ReadHolds,
65 TimelineContext, TimestampContext, TimestampProvider, optimize,
66};
67use crate::{coord, metrics};
68
69impl PeekClient {
70 pub(crate) async fn try_frontend_peek(
78 &mut self,
79 portal_name: &str,
80 session: &mut Session,
81 outer_ctx_extra: &mut Option<ExecuteContextGuard>,
82 ) -> Result<Option<ExecuteResponse>, AdapterError> {
83 if session.vars().emit_trace_id_notice() {
86 let span_context = tracing::Span::current()
87 .context()
88 .span()
89 .span_context()
90 .clone();
91 if span_context.is_valid() {
92 session.add_notice(AdapterNotice::QueryTrace {
93 trace_id: span_context.trace_id(),
94 });
95 }
96 }
97
98 let catalog = self.catalog_snapshot("try_frontend_peek").await;
105
106 let (stmt, params, logging, lifecycle_timestamps) = {
108 if let Err(err) = Coordinator::verify_portal(&*catalog, session, portal_name) {
109 outer_ctx_extra
110 .take()
111 .and_then(|guard| guard.defuse().retire());
112 return Err(err);
113 }
114 let portal = session
115 .get_portal_unverified(portal_name)
116 .expect("called verify_portal above");
119 let params = portal.parameters.clone();
120 let stmt = portal.stmt.clone();
121 let logging = Arc::clone(&portal.logging);
122 let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
123 (stmt, params, logging, lifecycle_timestamps)
124 };
125
126 if let Some(ref stmt) = stmt {
129 match &**stmt {
130 Statement::Select(_)
131 | Statement::ExplainAnalyzeObject(_)
132 | Statement::ExplainAnalyzeCluster(_)
133 | Statement::Show(ShowStatement::ShowObjects(_))
134 | Statement::Show(ShowStatement::ShowColumns(_)) => {
135 }
140 Statement::ExplainPlan(explain_stmt) => {
141 match &explain_stmt.explainee {
146 mz_sql_parser::ast::Explainee::Select(..) => {
147 }
149 _ => {
150 debug!(
151 "Bailing out from try_frontend_peek, because EXPLAIN is not for a SELECT query"
152 );
153 return Ok(None);
154 }
155 }
156 }
157 Statement::ExplainPushdown(explain_stmt) => {
158 match &explain_stmt.explainee {
160 mz_sql_parser::ast::Explainee::Select(_, false) => {}
161 _ => {
162 debug!(
163 "Bailing out from try_frontend_peek, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is for EXPLAIN BROKEN"
164 );
165 return Ok(None);
166 }
167 }
168 }
169 Statement::Copy(copy_stmt) => {
170 match ©_stmt.direction {
171 CopyDirection::To => {
172 }
174 CopyDirection::From => {
175 debug!(
176 "Bailing out from try_frontend_peek, because COPY FROM is not supported"
177 );
178 return Ok(None);
179 }
180 }
181 }
182
183 Statement::Subscribe(_)
184 if ENABLE_FRONTEND_SUBSCRIBES.get(catalog.system_config().dyncfgs()) =>
185 {
186 }
188 _ => {
189 debug!(
190 "Bailing out from try_frontend_peek, because statement type is not supported"
191 );
192 return Ok(None);
193 }
194 }
195 }
196
197 let statement_logging_id = if outer_ctx_extra.is_none() {
200 let result = self.statement_logging_frontend.begin_statement_execution(
202 session,
203 ¶ms,
204 &logging,
205 catalog.system_config(),
206 lifecycle_timestamps,
207 );
208
209 if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result {
210 self.log_began_execution(began_execution, mseh_update, prepared_statement);
211 Some(logging_id)
212 } else {
213 None
214 }
215 } else {
216 outer_ctx_extra
223 .take()
224 .and_then(|guard| guard.defuse().retire())
225 };
226
227 let result = self
228 .try_frontend_peek_inner(session, catalog, stmt, params, statement_logging_id)
229 .await;
230
231 if let Some(logging_id) = statement_logging_id {
234 let reason = match &result {
235 Ok(Some(
237 ExecuteResponse::SendingRowsStreaming { .. }
238 | ExecuteResponse::Subscribing { .. },
239 )) => {
240 return result;
243 }
244 Ok(Some(resp @ ExecuteResponse::CopyTo { resp: inner, .. })) => {
246 match inner.as_ref() {
247 ExecuteResponse::SendingRowsStreaming { .. }
248 | ExecuteResponse::Subscribing { .. } => {
249 return result;
252 }
253 _ => resp.into(),
255 }
256 }
257 Ok(None) => {
259 soft_panic_or_log!(
260 "Bailed out from `try_frontend_peek_inner` after we already logged the beginning of statement execution."
261 );
262 self.log_ended_execution(
265 logging_id,
266 StatementEndedExecutionReason::Errored {
267 error: "Internal error: bailed out from `try_frontend_peek_inner`"
268 .to_string(),
269 },
270 );
271 return result;
272 }
273 Ok(Some(resp)) => resp.into(),
278 Err(e) => StatementEndedExecutionReason::Errored {
279 error: e.to_string(),
280 },
281 };
282
283 self.log_ended_execution(logging_id, reason);
284 }
285
286 result
287 }
288
289 async fn try_frontend_peek_inner(
292 &mut self,
293 session: &mut Session,
294 catalog: Arc<Catalog>,
295 stmt: Option<Arc<Statement<Raw>>>,
296 params: Params,
297 statement_logging_id: Option<crate::statement_logging::StatementLoggingId>,
298 ) -> Result<Option<ExecuteResponse>, AdapterError> {
299 let stmt = match stmt {
300 Some(stmt) => stmt,
301 None => {
302 debug!("try_frontend_peek_inner succeeded on an empty query");
303 return Ok(Some(ExecuteResponse::EmptyQuery));
304 }
305 };
306
307 session
308 .metrics()
309 .query_total(&[
310 metrics::session_type_label_value(session.user()),
311 metrics::statement_type_label_value(&stmt),
312 ])
313 .inc();
314
315 let conn_catalog = catalog.for_session(session);
318 let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?;
321
322 let pcx = session.pcx();
323 let plan = mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, ¶ms, &resolved_ids)?;
324
325 enum QueryPlan<'a> {
327 Select(&'a SelectPlan),
328 CopyTo(&'a SelectPlan, CopyToContext),
329 Subscribe(&'a SubscribePlan),
330 }
331
332 let (query_plan, explain_ctx) = match &plan {
333 Plan::Select(select_plan) => {
334 let explain_ctx = if session.vars().emit_plan_insights_notice() {
335 let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
336 ExplainContext::PlanInsightsNotice(optimizer_trace)
337 } else {
338 ExplainContext::None
339 };
340 (QueryPlan::Select(select_plan), explain_ctx)
341 }
342 Plan::ShowColumns(show_columns_plan) => {
343 (
345 QueryPlan::Select(&show_columns_plan.select_plan),
346 ExplainContext::None,
347 )
348 }
349 Plan::ExplainPlan(plan::ExplainPlanPlan {
350 stage,
351 format,
352 config,
353 explainee: Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc }),
354 }) => {
355 let optimizer_trace = OptimizerTrace::new(stage.paths());
357 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
358 broken: *broken,
359 config: config.clone(),
360 format: *format,
361 stage: *stage,
362 replan: None,
363 desc: Some(desc.clone()),
364 optimizer_trace,
365 });
366 (QueryPlan::Select(plan), explain_ctx)
367 }
368 Plan::CopyTo(plan::CopyToPlan {
370 select_plan,
371 desc,
372 to,
373 connection,
374 connection_id,
375 format,
376 max_file_size,
377 }) => {
378 let uri = eval_copy_to_uri(to.clone(), session, catalog.state())?;
379
380 let copy_to_ctx = CopyToContext {
382 desc: desc.clone(),
383 uri,
384 connection: connection.clone(),
385 connection_id: *connection_id,
386 format: format.clone(),
387 max_file_size: *max_file_size,
388 output_batch_count: None,
389 };
390
391 (
392 QueryPlan::CopyTo(select_plan, copy_to_ctx),
393 ExplainContext::None,
394 )
395 }
396 Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => {
397 match explainee {
399 plan::Explainee::Statement(plan::ExplaineeStatement::Select {
400 broken: false,
401 plan,
402 desc: _,
403 }) => {
404 let explain_ctx = ExplainContext::Pushdown;
405 (QueryPlan::Select(plan), explain_ctx)
406 }
407 _ => {
408 soft_panic_or_log!(
411 "unexpected EXPLAIN FILTER PUSHDOWN plan kind in frontend peek sequencing: {:?}",
412 explainee
413 );
414 debug!(
415 "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is EXPLAIN BROKEN"
416 );
417 return Ok(None);
418 }
419 }
420 }
421 Plan::SideEffectingFunc(sef_plan) => {
422 let response = self
426 .call_coordinator(|tx| Command::ExecuteSideEffectingFunc {
427 plan: sef_plan.clone(),
428 conn_id: session.conn_id().clone(),
429 current_role: session.role_metadata().current_role,
430 tx,
431 })
432 .await?;
433 return Ok(Some(response));
434 }
435 Plan::Subscribe(subscribe) => (QueryPlan::Subscribe(subscribe), ExplainContext::None),
436 _ => {
437 soft_panic_or_log!(
440 "Unexpected plan kind in frontend peek sequencing: {:?}",
441 plan
442 );
443 debug!(
444 "Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, side-effecting SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO S3"
445 );
446 return Ok(None);
447 }
448 };
449
450 let when = match query_plan {
451 QueryPlan::Select(s) => &s.when,
452 QueryPlan::CopyTo(s, _) => &s.when,
453 QueryPlan::Subscribe(s) => &s.when,
454 };
455
456 let depends_on = match query_plan {
457 QueryPlan::Select(s) => s.source.depends_on(),
458 QueryPlan::CopyTo(s, _) => s.source.depends_on(),
459 QueryPlan::Subscribe(s) => s.from.depends_on(),
460 };
461
462 let contains_temporal = match query_plan {
463 QueryPlan::Select(s) => s.source.contains_temporal(),
464 QueryPlan::CopyTo(s, _) => s.source.contains_temporal(),
465 QueryPlan::Subscribe(s) => Ok(s.from.contains_temporal()),
466 };
467
468 assert!(plan.allowed_in_read_only());
472
473 let (cluster, target_cluster_id, target_cluster_name) = {
474 let target_cluster = match session.transaction().cluster() {
475 Some(cluster_id) => TargetCluster::Transaction(cluster_id),
477 None => coord::catalog_serving::auto_run_on_catalog_server(
479 &conn_catalog,
480 session,
481 &plan,
482 ),
483 };
484 let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
485 (cluster, cluster.id, &cluster.name)
486 };
487
488 if let Some(logging_id) = &statement_logging_id {
490 self.log_set_cluster(*logging_id, target_cluster_id);
491 }
492
493 coord::catalog_serving::check_cluster_restrictions(
494 target_cluster_name.as_str(),
495 &conn_catalog,
496 &plan,
497 )?;
498
499 rbac::check_plan(
500 &conn_catalog,
501 None::<fn(u32) -> Option<RoleId>>,
504 session,
505 &plan,
506 Some(target_cluster_id),
507 &resolved_ids,
508 )?;
509
510 if let Some((_, wait_future)) =
511 coord::appends::waiting_on_startup_appends(&*catalog, session, &plan)
512 {
513 wait_future.await;
514 }
515
516 let max_query_result_size = Some(session.vars().max_query_result_size());
517
518 let compute_instance_snapshot =
523 ComputeInstanceSnapshot::new_without_collections(cluster.id());
524
525 let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config())
526 .override_from(&catalog.get_cluster(cluster.id()).config.features())
527 .override_from(&explain_ctx);
528
529 if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
530 return Err(AdapterError::NoClusterReplicasAvailable {
531 name: cluster.name.clone(),
532 is_managed: cluster.is_managed(),
533 });
534 }
535
536 let (_, view_id) = self.transient_id_gen.allocate_id();
537 let (_, index_id) = self.transient_id_gen.allocate_id();
538
539 let target_replica_name = session.vars().cluster_replica();
540 let mut target_replica = target_replica_name
541 .map(|name| {
542 cluster
543 .replica_id(name)
544 .ok_or(AdapterError::UnknownClusterReplica {
545 cluster_name: cluster.name.clone(),
546 replica_name: name.to_string(),
547 })
548 })
549 .transpose()?;
550
551 let source_ids = depends_on;
552 let mut timeline_context = catalog.validate_timeline_context(source_ids.iter().copied())?;
556 if matches!(timeline_context, TimelineContext::TimestampIndependent) && contains_temporal? {
557 timeline_context = TimelineContext::TimestampDependent;
561 }
562
563 let notices = coord::sequencer::check_log_reads(
564 &catalog,
565 cluster,
566 &source_ids,
567 &mut target_replica,
568 session.vars(),
569 )?;
570 session.add_notices(notices);
571
572 let isolation_level = session.vars().transaction_isolation().clone();
575 let timeline = Coordinator::get_timeline(&timeline_context);
576 let needs_linearized_read_ts =
577 Coordinator::needs_linearized_read_ts(&isolation_level, when);
578
579 let oracle_read_ts = match timeline {
580 Some(timeline) if needs_linearized_read_ts => {
581 let oracle = self.ensure_oracle(timeline).await?;
582 let oracle_read_ts = oracle.read_ts().await;
583 Some(oracle_read_ts)
584 }
585 Some(_) | None => None,
586 };
587
588 let vars = session.vars();
591 let real_time_recency_ts: Option<Timestamp> = if vars.real_time_recency()
592 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
593 && !session.contains_read_timestamp()
594 {
595 self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp {
597 source_ids: source_ids.clone(),
598 real_time_recency_timeout: *vars.real_time_recency_timeout(),
599 tx,
600 })
601 .await?
602 } else {
603 None
604 };
605
606 let dataflow_builder =
609 DataflowBuilder::new(catalog.state(), compute_instance_snapshot.clone());
610 let input_id_bundle = dataflow_builder.sufficient_collections(source_ids.clone());
611
612 let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when)
625 && !matches!(query_plan, QueryPlan::Subscribe { .. });
626
627 let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
629 Some(
632 determination @ TimestampDetermination {
633 timestamp_context: TimestampContext::TimelineTimestamp { .. },
634 ..
635 },
636 ) if in_immediate_multi_stmt_txn => {
637 let txn_read_holds_opt = self
645 .call_coordinator(|tx| Command::GetTransactionReadHoldsBundle {
646 conn_id: session.conn_id().clone(),
647 tx,
648 })
649 .await;
650
651 if let Some(txn_read_holds) = txn_read_holds_opt {
652 let allowed_id_bundle = txn_read_holds.id_bundle();
653 let outside = input_id_bundle.difference(&allowed_id_bundle);
654
655 if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
657 let valid_names =
658 allowed_id_bundle.resolve_names(&*catalog, session.conn_id());
659 let invalid_names = outside.resolve_names(&*catalog, session.conn_id());
660 return Err(AdapterError::RelationOutsideTimeDomain {
661 relations: invalid_names,
662 names: valid_names,
663 });
664 }
665
666 let read_holds = txn_read_holds.subset(&input_id_bundle);
668
669 (determination, read_holds)
670 } else {
671 return Err(AdapterError::Internal(
676 "Missing transaction read holds for multi-statement transaction"
677 .to_string(),
678 ));
679 }
680 }
681 _ => {
682 let timedomain_bundle;
689 let determine_bundle = if in_immediate_multi_stmt_txn {
690 timedomain_bundle = timedomain_for(
694 &*catalog,
695 &dataflow_builder,
696 &source_ids,
697 &timeline_context,
698 session.conn_id(),
699 target_cluster_id,
700 )?;
701 &timedomain_bundle
702 } else {
703 &input_id_bundle
705 };
706 let (determination, read_holds) = self
707 .frontend_determine_timestamp(
708 session,
709 determine_bundle,
710 when,
711 target_cluster_id,
712 &timeline_context,
713 oracle_read_ts,
714 real_time_recency_ts,
715 )
716 .await?;
717
718 if in_immediate_multi_stmt_txn {
722 self.call_coordinator(|tx| Command::StoreTransactionReadHolds {
723 conn_id: session.conn_id().clone(),
724 read_holds: read_holds.clone(),
725 tx,
726 })
727 .await;
728 }
729
730 (determination, read_holds)
731 }
732 };
733
734 {
735 for id in input_id_bundle.iter() {
737 let s = read_holds.storage_holds.contains_key(&id);
738 let c = read_holds
739 .compute_ids()
740 .map(|(_instance, coll)| coll)
741 .contains(&id);
742 soft_assert_or_log!(
743 s || c,
744 "missing read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
745 id,
746 in_immediate_multi_stmt_txn,
747 );
748 }
749
750 for id in input_id_bundle.storage_ids.iter() {
753 soft_assert_or_log!(
754 read_holds.storage_holds.contains_key(id),
755 "missing storage read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
756 id,
757 in_immediate_multi_stmt_txn,
758 );
759 }
760 for id in input_id_bundle
761 .compute_ids
762 .iter()
763 .flat_map(|(_instance, colls)| colls)
764 {
765 soft_assert_or_log!(
766 read_holds
767 .compute_ids()
768 .map(|(_instance, coll)| coll)
769 .contains(id),
770 "missing compute read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
771 id,
772 in_immediate_multi_stmt_txn,
773 );
774 }
775 }
776
777 let requires_linearization = (&explain_ctx).into();
790 let mut transaction_determination = determination.clone();
791 match query_plan {
792 QueryPlan::Subscribe { .. } => {
793 if when.is_transactional() {
794 session.add_transaction_ops(TransactionOps::Subscribe)?;
795 }
796 }
797 QueryPlan::Select(..) | QueryPlan::CopyTo(..) => {
798 if when.is_transactional() {
799 session.add_transaction_ops(TransactionOps::Peeks {
800 determination: transaction_determination,
801 cluster_id: target_cluster_id,
802 requires_linearization,
803 })?;
804 } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
805 transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
807 session.add_transaction_ops(TransactionOps::Peeks {
808 determination: transaction_determination,
809 cluster_id: target_cluster_id,
810 requires_linearization,
811 })?;
812 }
813 }
814 }
815
816 let stats = statistics_oracle(
819 session,
820 &source_ids,
821 &determination.timestamp_context.antichain(),
822 true,
823 catalog.system_config(),
824 &*self.storage_collections,
825 )
826 .await
827 .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
828
829 let timestamp_context = determination.timestamp_context.clone();
832 let session_meta = session.meta();
833 let now = catalog.config().now.clone();
834 let target_cluster_name = target_cluster_name.clone();
835 let needs_plan_insights = explain_ctx.needs_plan_insights();
836 let determination_for_pushdown = if matches!(explain_ctx, ExplainContext::Pushdown) {
837 Some(determination.clone())
840 } else {
841 None
842 };
843
844 let span = Span::current();
845
846 let catalog_for_insights = if needs_plan_insights {
848 Some(Arc::clone(&catalog))
849 } else {
850 None
851 };
852 let mut compute_instances = BTreeMap::new();
853 if needs_plan_insights {
854 for user_cluster in catalog.user_clusters() {
855 let snapshot = ComputeInstanceSnapshot::new_without_collections(user_cluster.id);
856 compute_instances.insert(user_cluster.name.clone(), snapshot);
857 }
858 }
859
860 let source_ids_for_closure = source_ids.clone();
861
862 let optimization_future: JoinHandle<Result<_, AdapterError>> = match query_plan {
863 QueryPlan::CopyTo(select_plan, mut copy_to_ctx) => {
864 let raw_expr = select_plan.source.clone();
865
866 let worker_counts = cluster.replicas().map(|r| {
868 let loc = &r.config.location;
869 loc.workers().unwrap_or_else(|| loc.num_processes())
870 });
871 let max_worker_count = match worker_counts.max() {
872 Some(count) => u64::cast_from(count),
873 None => {
874 return Err(AdapterError::NoClusterReplicasAvailable {
875 name: cluster.name.clone(),
876 is_managed: cluster.is_managed(),
877 });
878 }
879 };
880 copy_to_ctx.output_batch_count = Some(max_worker_count);
881
882 let mut optimizer = optimize::copy_to::Optimizer::new(
883 Arc::clone(&catalog),
884 compute_instance_snapshot,
885 view_id,
886 copy_to_ctx,
887 optimizer_config,
888 self.optimizer_metrics.clone(),
889 );
890
891 mz_ore::task::spawn_blocking(
892 || "optimize copy-to",
893 move || {
894 span.in_scope(|| {
895 let _dispatch_guard = explain_ctx.dispatch_guard();
896
897 let local_mir_plan =
900 optimizer.catch_unwind_optimize(raw_expr.clone())?;
901 let local_mir_plan = local_mir_plan.resolve(
903 timestamp_context.clone(),
904 &session_meta,
905 stats,
906 );
907 let global_lir_plan =
909 optimizer.catch_unwind_optimize(local_mir_plan)?;
910 Ok(Execution::CopyToS3 {
911 global_lir_plan,
912 source_ids: source_ids_for_closure,
913 })
914 })
915 },
916 )
917 }
918 QueryPlan::Select(select_plan) => {
919 let select_plan = select_plan.clone();
920 let raw_expr = select_plan.source.clone();
921
922 let mut optimizer = optimize::peek::Optimizer::new(
924 Arc::clone(&catalog),
925 compute_instance_snapshot,
926 select_plan.finishing.clone(),
927 view_id,
928 index_id,
929 optimizer_config,
930 self.optimizer_metrics.clone(),
931 );
932
933 mz_ore::task::spawn_blocking(
934 || "optimize peek",
935 move || {
936 span.in_scope(|| {
937 let _dispatch_guard = explain_ctx.dispatch_guard();
938
939 let pipeline = || {
946 let local_mir_plan =
947 optimizer.catch_unwind_optimize(raw_expr.clone())?;
948 let local_mir_plan = local_mir_plan.resolve(
950 timestamp_context.clone(),
951 &session_meta,
952 stats,
953 );
954 let global_lir_plan =
956 optimizer.catch_unwind_optimize(local_mir_plan)?;
957 Ok::<_, AdapterError>(global_lir_plan)
958 };
959
960 let global_lir_plan_result = pipeline();
961 let optimization_finished_at = now();
962
963 let create_insights_ctx =
964 |optimizer: &optimize::peek::Optimizer,
965 is_notice: bool|
966 -> Option<Box<PlanInsightsContext>> {
967 if !needs_plan_insights {
968 return None;
969 }
970
971 let catalog = catalog_for_insights.as_ref()?;
972
973 let enable_re_optimize = if needs_plan_insights {
974 let dyncfgs = catalog.system_config().dyncfgs();
982 let opt_limit = mz_adapter_types::dyncfgs
983 ::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
984 .get(dyncfgs);
985 !(is_notice && optimizer.duration() > opt_limit)
986 } else {
987 false
988 };
989
990 Some(Box::new(PlanInsightsContext {
991 stmt: select_plan
992 .select
993 .as_deref()
994 .map(Clone::clone)
995 .map(Statement::Select),
996 raw_expr: raw_expr.clone(),
997 catalog: Arc::clone(catalog),
998 compute_instances,
999 target_instance: target_cluster_name,
1000 metrics: optimizer.metrics().clone(),
1001 finishing: optimizer.finishing().clone(),
1002 optimizer_config: optimizer.config().clone(),
1003 session: session_meta,
1004 timestamp_context,
1005 view_id: optimizer.select_id(),
1006 index_id: optimizer.index_id(),
1007 enable_re_optimize,
1008 }))
1009 };
1010
1011 let global_lir_plan = match global_lir_plan_result {
1012 Ok(plan) => plan,
1013 Err(err) => {
1014 let result = if let ExplainContext::Plan(explain_ctx) =
1015 explain_ctx
1016 && explain_ctx.broken
1017 {
1018 tracing::error!(
1020 "error while handling EXPLAIN statement: {}",
1021 err
1022 );
1023 Ok(Execution::ExplainPlan {
1024 df_meta: Default::default(),
1025 explain_ctx,
1026 optimizer,
1027 insights_ctx: None,
1028 })
1029 } else {
1030 Err(err)
1031 };
1032 return result;
1033 }
1034 };
1035
1036 match explain_ctx {
1037 ExplainContext::Plan(explain_ctx) => {
1038 let (_, df_meta, _) = global_lir_plan.unapply();
1039 let insights_ctx = create_insights_ctx(&optimizer, false);
1040 Ok(Execution::ExplainPlan {
1041 df_meta,
1042 explain_ctx,
1043 optimizer,
1044 insights_ctx,
1045 })
1046 }
1047 ExplainContext::None => Ok(Execution::Peek {
1048 global_lir_plan,
1049 optimization_finished_at,
1050 plan_insights_optimizer_trace: None,
1051 finishing: select_plan.finishing,
1052 copy_to: select_plan.copy_to,
1053 insights_ctx: None,
1054 }),
1055 ExplainContext::PlanInsightsNotice(optimizer_trace) => {
1056 let insights_ctx = create_insights_ctx(&optimizer, true);
1057 Ok(Execution::Peek {
1058 global_lir_plan,
1059 optimization_finished_at,
1060 plan_insights_optimizer_trace: Some(optimizer_trace),
1061 finishing: select_plan.finishing,
1062 copy_to: select_plan.copy_to,
1063 insights_ctx,
1064 })
1065 }
1066 ExplainContext::Pushdown => {
1067 let (plan, _, _) = global_lir_plan.unapply();
1068 let imports = match plan {
1069 PeekPlan::SlowPath(plan) => plan
1070 .desc
1071 .source_imports
1072 .into_iter()
1073 .filter_map(|(id, import)| {
1074 import.desc.arguments.operators.map(|mfp| (id, mfp))
1075 })
1076 .collect(),
1077 PeekPlan::FastPath(_) => {
1078 std::collections::BTreeMap::default()
1079 }
1080 };
1081 Ok(Execution::ExplainPushdown {
1082 imports,
1083 determination: determination_for_pushdown
1084 .expect("it's present for the ExplainPushdown case"),
1085 })
1086 }
1087 }
1088 })
1089 },
1090 )
1091 }
1092 QueryPlan::Subscribe(plan) => {
1093 let plan = plan.clone();
1094 let catalog: Arc<Catalog> = Arc::clone(&catalog);
1095 let debug_name = format!("subscribe-{}", index_id);
1096 let mut optimizer = optimize::subscribe::Optimizer::new(
1097 catalog,
1098 compute_instance_snapshot.clone(),
1099 view_id,
1100 index_id,
1101 plan.with_snapshot,
1102 plan.up_to,
1103 debug_name,
1104 optimizer_config,
1105 self.optimizer_metrics.clone(),
1106 );
1107 mz_ore::task::spawn_blocking(
1108 || "optimize subscribe",
1109 move || {
1110 span.in_scope(|| {
1111 let _dispatch_guard = explain_ctx.dispatch_guard();
1112
1113 let global_mir_plan = optimizer.catch_unwind_optimize(plan.clone())?;
1114 let as_of = timestamp_context.timestamp_or_default();
1115
1116 if let Some(up_to) = optimizer.up_to() {
1117 if as_of > up_to {
1118 return Err(AdapterError::AbsurdSubscribeBounds {
1119 as_of,
1120 up_to,
1121 });
1122 }
1123 }
1124 let local_mir_plan =
1125 global_mir_plan.resolve(Antichain::from_elem(as_of));
1126
1127 let global_lir_plan =
1128 optimizer.catch_unwind_optimize(local_mir_plan)?;
1129 let optimization_finished_at = now();
1130
1131 let (df_desc, df_meta) = global_lir_plan.unapply();
1132 Ok(Execution::Subscribe {
1133 subscribe_plan: plan,
1134 df_desc,
1135 df_meta,
1136 optimization_finished_at,
1137 })
1138 })
1139 },
1140 )
1141 }
1142 };
1143
1144 let mut optimization_timeout = *session.vars().statement_timeout();
1145 if optimization_timeout == Duration::ZERO {
1147 optimization_timeout = Duration::MAX;
1148 }
1149 let optimization_result =
1150 match tokio::time::timeout(optimization_timeout, optimization_future).await {
1155 Ok(Ok(result)) => result,
1156 Ok(Err(AdapterError::Optimizer(err))) => {
1157 return Err(AdapterError::Internal(format!(
1158 "internal error in optimizer: {}",
1159 err
1160 )));
1161 }
1162 Ok(Err(err)) => {
1163 return Err(err);
1164 }
1165 Err(_elapsed) => {
1166 warn!("optimize peek timed out after {:?}", optimization_timeout);
1167 return Err(AdapterError::StatementTimeout);
1168 }
1169 };
1170
1171 if let Some(logging_id) = &statement_logging_id {
1173 self.log_lifecycle_event(*logging_id, StatementLifecycleEvent::OptimizationFinished);
1174 }
1175
1176 Self::assert_read_holds_correct(
1178 &read_holds,
1179 &optimization_result,
1180 &determination,
1181 target_cluster_id,
1182 in_immediate_multi_stmt_txn,
1183 );
1184
1185 match optimization_result {
1187 Execution::ExplainPlan {
1188 df_meta,
1189 explain_ctx,
1190 optimizer,
1191 insights_ctx,
1192 } => {
1193 let rows = coord::sequencer::explain_plan_inner(
1194 session,
1195 &catalog,
1196 df_meta,
1197 explain_ctx,
1198 optimizer,
1199 insights_ctx,
1200 )
1201 .await?;
1202
1203 Ok(Some(ExecuteResponse::SendingRowsImmediate {
1204 rows: Box::new(rows.into_row_iter()),
1205 }))
1206 }
1207 Execution::ExplainPushdown {
1208 imports,
1209 determination,
1210 } => {
1211 let as_of = determination.timestamp_context.antichain();
1214 let mz_now = determination
1215 .timestamp_context
1216 .timestamp()
1217 .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1218 .unwrap_or_else(ResultSpec::value_all);
1219
1220 Ok(Some(
1221 coord::sequencer::explain_pushdown_future_inner(
1222 session,
1223 &*catalog,
1224 &self.storage_collections,
1225 as_of,
1226 mz_now,
1227 imports,
1228 )
1229 .await
1230 .await?,
1231 ))
1232 }
1233 Execution::Peek {
1234 global_lir_plan,
1235 optimization_finished_at: _optimization_finished_at,
1236 plan_insights_optimizer_trace,
1237 finishing,
1238 copy_to,
1239 insights_ctx,
1240 } => {
1241 let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
1246
1247 coord::sequencer::emit_optimizer_notices(
1248 &*catalog,
1249 session,
1250 &df_meta.optimizer_notices,
1251 );
1252
1253 if let Some(trace) = plan_insights_optimizer_trace {
1255 let target_cluster = catalog.get_cluster(target_cluster_id);
1256 let features = OptimizerFeatures::from(catalog.system_config())
1257 .override_from(&target_cluster.config.features());
1258 let insights = trace
1259 .into_plan_insights(
1260 &features,
1261 &catalog.for_session(session),
1262 Some(finishing.clone()),
1263 Some(target_cluster),
1264 df_meta.clone(),
1265 insights_ctx,
1266 )
1267 .await?;
1268 session.add_notice(AdapterNotice::PlanInsights(insights));
1269 }
1270
1271 let watch_set = statement_logging_id.map(|logging_id| {
1274 WatchSetCreation::new(
1275 logging_id,
1276 catalog.state(),
1277 &input_id_bundle,
1278 determination.timestamp_context.timestamp_or_default(),
1279 )
1280 });
1281
1282 let max_result_size = catalog.system_config().max_result_size();
1283
1284 let determination_for_notice = if session.vars().emit_timestamp_notice() {
1287 Some(determination.clone())
1288 } else {
1289 None
1290 };
1291
1292 let response = match peek_plan {
1293 PeekPlan::FastPath(fast_path_plan) => {
1294 if let Some(logging_id) = &statement_logging_id {
1295 if !matches!(fast_path_plan, FastPathPlan::Constant(..)) {
1309 self.log_set_timestamp(
1310 *logging_id,
1311 determination.timestamp_context.timestamp_or_default(),
1312 );
1313 }
1314 }
1315
1316 let row_set_finishing_seconds =
1317 session.metrics().row_set_finishing_seconds().clone();
1318
1319 let peek_stash_read_batch_size_bytes =
1320 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
1321 .get(catalog.system_config().dyncfgs());
1322 let peek_stash_read_memory_budget_bytes =
1323 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
1324 .get(catalog.system_config().dyncfgs());
1325
1326 self.implement_fast_path_peek_plan(
1327 fast_path_plan,
1328 determination.timestamp_context.timestamp_or_default(),
1329 finishing,
1330 target_cluster_id,
1331 target_replica,
1332 typ,
1333 max_result_size,
1334 max_query_result_size,
1335 row_set_finishing_seconds,
1336 read_holds,
1337 peek_stash_read_batch_size_bytes,
1338 peek_stash_read_memory_budget_bytes,
1339 session.conn_id().clone(),
1340 source_ids,
1341 watch_set,
1342 )
1343 .await?
1344 }
1345 PeekPlan::SlowPath(dataflow_plan) => {
1346 if let Some(logging_id) = &statement_logging_id {
1347 self.log_set_transient_index_id(*logging_id, dataflow_plan.id);
1348 }
1349
1350 self.call_coordinator(|tx| Command::ExecuteSlowPathPeek {
1351 dataflow_plan: Box::new(dataflow_plan),
1352 determination,
1353 finishing,
1354 compute_instance: target_cluster_id,
1355 target_replica,
1356 intermediate_result_type: typ,
1357 source_ids,
1358 conn_id: session.conn_id().clone(),
1359 max_result_size,
1360 max_query_result_size,
1361 watch_set,
1362 tx,
1363 })
1364 .await?
1365 }
1366 };
1367
1368 if let Some(determination) = determination_for_notice {
1370 let explanation = self
1371 .call_coordinator(|tx| Command::ExplainTimestamp {
1372 conn_id: session.conn_id().clone(),
1373 session_wall_time: session.pcx().wall_time,
1374 cluster_id: target_cluster_id,
1375 id_bundle: input_id_bundle.clone(),
1376 determination,
1377 tx,
1378 })
1379 .await;
1380 session.add_notice(AdapterNotice::QueryTimestamp { explanation });
1381 }
1382
1383 Ok(Some(match copy_to {
1384 None => response,
1385 Some(format) => ExecuteResponse::CopyTo {
1387 format,
1388 resp: Box::new(response),
1389 },
1390 }))
1391 }
1392 Execution::Subscribe {
1393 subscribe_plan,
1394 df_desc,
1395 df_meta,
1396 optimization_finished_at: _optimization_finished_at,
1397 } => {
1398 if df_desc.as_of.as_ref().expect("as of set") == &df_desc.until {
1399 session.add_notice(AdapterNotice::EqualSubscribeBounds {
1400 bound: *df_desc.until.as_option().expect("as of set"),
1401 });
1402 }
1403 coord::sequencer::emit_optimizer_notices(
1404 &*catalog,
1405 session,
1406 &df_meta.optimizer_notices,
1407 );
1408
1409 let response = self
1410 .call_coordinator(|tx| Command::ExecuteSubscribe {
1411 df_desc,
1412 dependency_ids: subscribe_plan.from.depends_on(),
1413 cluster_id: target_cluster_id,
1414 replica_id: target_replica,
1415 conn_id: session.conn_id().clone(),
1416 session_uuid: session.uuid(),
1417 read_holds,
1418 plan: subscribe_plan,
1419 statement_logging_id,
1420 tx,
1421 })
1422 .await?;
1423 Ok(Some(response))
1424 }
1425 Execution::CopyToS3 {
1426 global_lir_plan,
1427 source_ids,
1428 } => {
1429 let (df_desc, df_meta) = global_lir_plan.unapply();
1430
1431 coord::sequencer::emit_optimizer_notices(
1432 &*catalog,
1433 session,
1434 &df_meta.optimizer_notices,
1435 );
1436
1437 let sink_id = df_desc.sink_id();
1439 let sinks = &df_desc.sink_exports;
1440 if sinks.len() != 1 {
1441 return Err(AdapterError::Internal(
1442 "expected exactly one copy to s3 sink".into(),
1443 ));
1444 }
1445 let (_, sink_desc) = sinks
1446 .first_key_value()
1447 .expect("known to be exactly one copy to s3 sink");
1448 let s3_sink_connection = match &sink_desc.connection {
1449 mz_compute_types::sinks::ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1450 conn.clone()
1451 }
1452 _ => {
1453 return Err(AdapterError::Internal(
1454 "expected copy to s3 oneshot sink".into(),
1455 ));
1456 }
1457 };
1458
1459 self.call_coordinator(|tx| Command::CopyToPreflight {
1462 s3_sink_connection,
1463 sink_id,
1464 tx,
1465 })
1466 .await?;
1467
1468 let watch_set = statement_logging_id.map(|logging_id| {
1470 WatchSetCreation::new(
1471 logging_id,
1472 catalog.state(),
1473 &input_id_bundle,
1474 determination.timestamp_context.timestamp_or_default(),
1475 )
1476 });
1477
1478 let response = self
1479 .call_coordinator(|tx| Command::ExecuteCopyTo {
1480 df_desc: Box::new(df_desc),
1481 compute_instance: target_cluster_id,
1482 target_replica,
1483 source_ids,
1484 conn_id: session.conn_id().clone(),
1485 watch_set,
1486 tx,
1487 })
1488 .await?;
1489
1490 Ok(Some(response))
1491 }
1492 }
1493 }
1494
1495 pub(crate) async fn frontend_determine_timestamp(
1502 &mut self,
1503 session: &Session,
1504 id_bundle: &CollectionIdBundle,
1505 when: &QueryWhen,
1506 compute_instance: ComputeInstanceId,
1507 timeline_context: &TimelineContext,
1508 oracle_read_ts: Option<Timestamp>,
1509 real_time_recency_ts: Option<Timestamp>,
1510 ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> {
1511 let isolation_level = session.vars().transaction_isolation();
1514
1515 let (read_holds, upper) = self
1516 .acquire_read_holds_and_least_valid_write(id_bundle)
1517 .await
1518 .map_err(|err| {
1519 AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1520 err,
1521 compute_instance,
1522 )
1523 })?;
1524 let (det, read_holds) = <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1525 session,
1526 id_bundle,
1527 when,
1528 timeline_context,
1529 oracle_read_ts,
1530 real_time_recency_ts,
1531 isolation_level,
1532 read_holds,
1533 upper.clone(),
1534 )?;
1535
1536 session
1537 .metrics()
1538 .determine_timestamp(&[
1539 match det.respond_immediately() {
1540 true => "true",
1541 false => "false",
1542 },
1543 isolation_level.as_str(),
1544 &compute_instance.to_string(),
1545 ])
1546 .inc();
1547 if !det.respond_immediately()
1548 && isolation_level == &IsolationLevel::StrictSerializable
1549 && real_time_recency_ts.is_none()
1550 {
1551 if let Some(strict) = det.timestamp_context.timestamp() {
1553 let (serializable_det, _tmp_read_holds) =
1554 <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1555 session,
1556 id_bundle,
1557 when,
1558 timeline_context,
1559 oracle_read_ts,
1560 real_time_recency_ts,
1561 &IsolationLevel::Serializable,
1562 read_holds.clone(),
1563 upper,
1564 )?;
1565 if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
1566 session
1567 .metrics()
1568 .timestamp_difference_for_strict_serializable_ms(&[compute_instance
1569 .to_string()
1570 .as_ref()])
1571 .observe(f64::cast_lossy(u64::from(
1572 strict.saturating_sub(*serializable),
1573 )));
1574 }
1575 }
1576 }
1577
1578 Ok((det, read_holds))
1579 }
1580
1581 fn assert_read_holds_correct(
1582 read_holds: &ReadHolds,
1583 execution: &Execution,
1584 determination: &TimestampDetermination,
1585 target_cluster_id: ClusterId,
1586 in_immediate_multi_stmt_txn: bool,
1587 ) {
1588 let (source_imports, index_imports, as_of, execution_name): (
1590 Vec<GlobalId>,
1591 Vec<GlobalId>,
1592 Timestamp,
1593 &str,
1594 ) = match execution {
1595 Execution::Peek {
1596 global_lir_plan, ..
1597 } => match global_lir_plan.peek_plan() {
1598 PeekPlan::FastPath(fast_path_plan) => {
1599 let (sources, indexes) = match fast_path_plan {
1600 FastPathPlan::Constant(..) => (vec![], vec![]),
1601 FastPathPlan::PeekExisting(_coll_id, idx_id, ..) => (vec![], vec![*idx_id]),
1602 FastPathPlan::PeekPersist(global_id, ..) => (vec![*global_id], vec![]),
1603 };
1604 (
1605 sources,
1606 indexes,
1607 determination.timestamp_context.timestamp_or_default(),
1608 "FastPath",
1609 )
1610 }
1611 PeekPlan::SlowPath(dataflow_plan) => {
1612 let as_of = dataflow_plan
1613 .desc
1614 .as_of
1615 .clone()
1616 .expect("dataflow has an as_of")
1617 .into_element();
1618 (
1619 dataflow_plan.desc.source_imports.keys().cloned().collect(),
1620 dataflow_plan.desc.index_imports.keys().cloned().collect(),
1621 as_of,
1622 "SlowPath",
1623 )
1624 }
1625 },
1626 Execution::CopyToS3 {
1627 global_lir_plan, ..
1628 } => {
1629 let df_desc = global_lir_plan.df_desc();
1630 let as_of = df_desc
1631 .as_of
1632 .clone()
1633 .expect("dataflow has an as_of")
1634 .into_element();
1635 (
1636 df_desc.source_imports.keys().cloned().collect(),
1637 df_desc.index_imports.keys().cloned().collect(),
1638 as_of,
1639 "CopyToS3",
1640 )
1641 }
1642 Execution::ExplainPlan { .. } | Execution::ExplainPushdown { .. } => {
1643 return;
1645 }
1646 Execution::Subscribe { df_desc, .. } => {
1647 let as_of = df_desc
1648 .as_of
1649 .clone()
1650 .expect("dataflow has an as_of")
1651 .into_element();
1652 (
1653 df_desc.source_imports.keys().cloned().collect(),
1654 df_desc.index_imports.keys().cloned().collect(),
1655 as_of,
1656 "Subscribe",
1657 )
1658 }
1659 };
1660
1661 for id in source_imports.iter() {
1663 soft_assert_or_log!(
1664 read_holds.storage_holds.contains_key(id),
1665 "[{}] missing read hold for the source import {}; (in_immediate_multi_stmt_txn: {})",
1666 execution_name,
1667 id,
1668 in_immediate_multi_stmt_txn,
1669 );
1670 }
1671 for id in index_imports.iter() {
1672 soft_assert_or_log!(
1673 read_holds
1674 .compute_ids()
1675 .map(|(_instance, coll)| coll)
1676 .contains(id),
1677 "[{}] missing read hold for the index import {}; (in_immediate_multi_stmt_txn: {})",
1678 execution_name,
1679 id,
1680 in_immediate_multi_stmt_txn,
1681 );
1682 }
1683
1684 for (id, h) in read_holds.storage_holds.iter() {
1686 soft_assert_or_log!(
1687 h.since().less_equal(&as_of),
1688 "[{}] storage read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1689 execution_name,
1690 h.since(),
1691 id,
1692 as_of,
1693 determination,
1694 in_immediate_multi_stmt_txn,
1695 );
1696 }
1697 for ((instance, id), h) in read_holds.compute_holds.iter() {
1698 soft_assert_eq_or_log!(
1699 *instance,
1700 target_cluster_id,
1701 "[{}] the read hold on {} is on the wrong cluster; (in_immediate_multi_stmt_txn: {})",
1702 execution_name,
1703 id,
1704 in_immediate_multi_stmt_txn,
1705 );
1706 soft_assert_or_log!(
1707 h.since().less_equal(&as_of),
1708 "[{}] compute read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1709 execution_name,
1710 h.since(),
1711 id,
1712 as_of,
1713 determination,
1714 in_immediate_multi_stmt_txn,
1715 );
1716 }
1717 }
1718}
1719
1720enum Execution {
1722 Peek {
1723 global_lir_plan: optimize::peek::GlobalLirPlan,
1724 optimization_finished_at: EpochMillis,
1725 plan_insights_optimizer_trace: Option<OptimizerTrace>,
1726 finishing: RowSetFinishing,
1727 copy_to: Option<plan::CopyFormat>,
1728 insights_ctx: Option<Box<PlanInsightsContext>>,
1729 },
1730 Subscribe {
1731 subscribe_plan: SubscribePlan,
1732 df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1733 df_meta: DataflowMetainfo,
1734 optimization_finished_at: EpochMillis,
1735 },
1736 CopyToS3 {
1737 global_lir_plan: optimize::copy_to::GlobalLirPlan,
1738 source_ids: BTreeSet<GlobalId>,
1739 },
1740 ExplainPlan {
1741 df_meta: DataflowMetainfo,
1742 explain_ctx: ExplainPlanContext,
1743 optimizer: optimize::peek::Optimizer,
1744 insights_ctx: Option<Box<PlanInsightsContext>>,
1745 },
1746 ExplainPushdown {
1747 imports: BTreeMap<GlobalId, mz_expr::MapFilterProject>,
1748 determination: TimestampDetermination,
1749 },
1750}