1use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::sync::Arc;
13use std::time::Duration;
14
15use itertools::Itertools;
16use mz_adapter_types::dyncfgs::ENABLE_FRONTEND_SUBSCRIBES;
17use mz_compute_types::ComputeInstanceId;
18use mz_compute_types::dataflows::DataflowDescription;
19use mz_controller_types::ClusterId;
20use mz_expr::{CollectionPlan, ResultSpec, RowSetFinishing};
21use mz_ore::cast::{CastFrom, CastLossy};
22use mz_ore::collections::CollectionExt;
23use mz_ore::now::EpochMillis;
24use mz_ore::task::JoinHandle;
25use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
26use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
27use mz_repr::role_id::RoleId;
28use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
29use mz_sql::ast::Raw;
30use mz_sql::catalog::CatalogCluster;
31use mz_sql::plan::Params;
32use mz_sql::plan::{
33 self, Explainee, ExplaineeStatement, Plan, QueryWhen, SelectPlan, SubscribePlan,
34};
35use mz_sql::rbac;
36use mz_sql::session::metadata::SessionMetadata;
37use mz_sql::session::vars::IsolationLevel;
38use mz_sql_parser::ast::{CopyDirection, ExplainStage, ShowStatement, Statement};
39use mz_transform::EmptyStatisticsOracle;
40use mz_transform::dataflow::DataflowMetainfo;
41use opentelemetry::trace::TraceContextExt;
42use timely::progress::Antichain;
43use tracing::{Span, debug, warn};
44use tracing_opentelemetry::OpenTelemetrySpanExt;
45
46use crate::catalog::Catalog;
47use crate::command::Command;
48use crate::coord::peek::{FastPathPlan, PeekPlan};
49use crate::coord::sequencer::{eval_copy_to_uri, statistics_oracle};
50use crate::coord::timeline::timedomain_for;
51use crate::coord::timestamp_selection::TimestampDetermination;
52use crate::coord::{
53 Coordinator, CopyToContext, ExecuteContextGuard, ExplainContext, ExplainPlanContext,
54 TargetCluster,
55};
56use crate::explain::insights::PlanInsightsContext;
57use crate::explain::optimizer_trace::OptimizerTrace;
58use crate::optimize::Optimize;
59use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
60use crate::session::{Session, TransactionOps, TransactionStatus};
61use crate::statement_logging::WatchSetCreation;
62use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
63use crate::{
64 AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, PeekClient, ReadHolds,
65 TimelineContext, TimestampContext, TimestampProvider, optimize,
66};
67use crate::{coord, metrics};
68
69impl PeekClient {
70 pub(crate) async fn try_frontend_peek(
78 &mut self,
79 portal_name: &str,
80 catalog: Option<Arc<Catalog>>,
81 session: &mut Session,
82 outer_ctx_extra: &mut Option<ExecuteContextGuard>,
83 ) -> Result<Option<ExecuteResponse>, AdapterError> {
84 if session.vars().emit_trace_id_notice() {
87 let span_context = tracing::Span::current()
88 .context()
89 .span()
90 .span_context()
91 .clone();
92 if span_context.is_valid() {
93 session.add_notice(AdapterNotice::QueryTrace {
94 trace_id: span_context.trace_id(),
95 });
96 }
97 }
98
99 let catalog = match catalog {
106 Some(c) => c,
107 None => self.catalog_snapshot("try_frontend_peek").await,
108 };
109
110 let (stmt, params, logging, lifecycle_timestamps) = {
112 if let Err(err) = Coordinator::verify_portal(&*catalog, session, portal_name) {
113 outer_ctx_extra
114 .take()
115 .and_then(|guard| guard.defuse().retire());
116 return Err(err);
117 }
118 let portal = session
119 .get_portal_unverified(portal_name)
120 .expect("called verify_portal above");
123 let params = portal.parameters.clone();
124 let stmt = portal.stmt.clone();
125 let logging = Arc::clone(&portal.logging);
126 let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
127 (stmt, params, logging, lifecycle_timestamps)
128 };
129
130 if let Some(ref stmt) = stmt {
133 match &**stmt {
134 Statement::Select(_)
135 | Statement::ExplainAnalyzeObject(_)
136 | Statement::ExplainAnalyzeCluster(_)
137 | Statement::Show(ShowStatement::ShowObjects(_))
138 | Statement::Show(ShowStatement::ShowColumns(_)) => {
139 }
144 Statement::ExplainPlan(explain_stmt) => {
145 match &explain_stmt.explainee {
150 mz_sql_parser::ast::Explainee::Select(..) => {
151 }
153 _ => {
154 debug!(
155 "Bailing out from try_frontend_peek, because EXPLAIN is not for a SELECT query"
156 );
157 return Ok(None);
158 }
159 }
160 }
161 Statement::ExplainPushdown(explain_stmt) => {
162 match &explain_stmt.explainee {
164 mz_sql_parser::ast::Explainee::Select(_, false) => {}
165 _ => {
166 debug!(
167 "Bailing out from try_frontend_peek, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is for EXPLAIN BROKEN"
168 );
169 return Ok(None);
170 }
171 }
172 }
173 Statement::Copy(copy_stmt) => {
174 match ©_stmt.direction {
175 CopyDirection::To => {
176 }
178 CopyDirection::From => {
179 debug!(
180 "Bailing out from try_frontend_peek, because COPY FROM is not supported"
181 );
182 return Ok(None);
183 }
184 }
185 }
186
187 Statement::Subscribe(_)
188 if ENABLE_FRONTEND_SUBSCRIBES.get(catalog.system_config().dyncfgs()) =>
189 {
190 }
192 _ => {
193 debug!(
194 "Bailing out from try_frontend_peek, because statement type is not supported"
195 );
196 return Ok(None);
197 }
198 }
199 }
200
201 let logging_guard = self.begin_statement_logging(
204 session,
205 ¶ms,
206 &logging,
207 &catalog,
208 lifecycle_timestamps,
209 outer_ctx_extra,
210 );
211 let statement_logging_id = logging_guard.id();
212 logging_guard.defuse();
220
221 let result = self
222 .try_frontend_peek_inner(session, catalog, stmt, params, statement_logging_id)
223 .await;
224
225 if let Some(logging_id) = statement_logging_id {
228 let reason = match &result {
229 Ok(Some(
233 ExecuteResponse::SendingRowsStreaming { .. }
234 | ExecuteResponse::Subscribing { .. },
235 )) => {
236 return result;
237 }
238 Ok(Some(resp @ ExecuteResponse::CopyTo { resp: inner, .. })) => {
240 match inner.as_ref() {
241 ExecuteResponse::SendingRowsStreaming { .. }
242 | ExecuteResponse::Subscribing { .. } => {
243 return result;
244 }
245 _ => resp.into(),
248 }
249 }
250 Ok(None) => {
252 soft_panic_or_log!(
253 "Bailed out from `try_frontend_peek_inner` after we already logged the beginning of statement execution."
254 );
255 self.log_ended_execution(
258 logging_id,
259 StatementEndedExecutionReason::Errored {
260 error: "Internal error: bailed out from `try_frontend_peek_inner`"
261 .to_string(),
262 },
263 );
264 return result;
265 }
266 Ok(Some(resp)) => resp.into(),
271 Err(e) => StatementEndedExecutionReason::Errored {
272 error: e.to_string(),
273 },
274 };
275
276 self.log_ended_execution(logging_id, reason);
277 }
278
279 result
280 }
281
282 async fn try_frontend_peek_inner(
285 &mut self,
286 session: &mut Session,
287 catalog: Arc<Catalog>,
288 stmt: Option<Arc<Statement<Raw>>>,
289 params: Params,
290 statement_logging_id: Option<crate::statement_logging::StatementLoggingId>,
291 ) -> Result<Option<ExecuteResponse>, AdapterError> {
292 let stmt = match stmt {
293 Some(stmt) => stmt,
294 None => {
295 debug!("try_frontend_peek_inner succeeded on an empty query");
296 return Ok(Some(ExecuteResponse::EmptyQuery));
297 }
298 };
299
300 session
301 .metrics()
302 .query_total(&[
303 metrics::session_type_label_value(session.user()),
304 metrics::statement_type_label_value(&stmt),
305 ])
306 .inc();
307
308 let conn_catalog = catalog.for_session(session);
311 let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?;
314
315 let pcx = session.pcx();
316 let (plan, sql_impl_ids) =
317 mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, ¶ms, &resolved_ids)?;
318
319 enum QueryPlan<'a> {
321 Select(&'a SelectPlan),
322 CopyTo(&'a SelectPlan, CopyToContext),
323 Subscribe(&'a SubscribePlan),
324 }
325
326 let (query_plan, explain_ctx) = match &plan {
327 Plan::Select(select_plan) => {
328 let explain_ctx = if session.vars().emit_plan_insights_notice() {
329 let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
330 ExplainContext::PlanInsightsNotice(optimizer_trace)
331 } else {
332 ExplainContext::None
333 };
334 (QueryPlan::Select(select_plan), explain_ctx)
335 }
336 Plan::ShowColumns(show_columns_plan) => {
337 (
339 QueryPlan::Select(&show_columns_plan.select_plan),
340 ExplainContext::None,
341 )
342 }
343 Plan::ExplainPlan(plan::ExplainPlanPlan {
344 stage,
345 format,
346 config,
347 explainee: Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc }),
348 }) => {
349 let optimizer_trace = OptimizerTrace::new(stage.paths());
351 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
352 broken: *broken,
353 config: config.clone(),
354 format: *format,
355 stage: *stage,
356 replan: None,
357 desc: Some(desc.clone()),
358 optimizer_trace,
359 });
360 (QueryPlan::Select(plan), explain_ctx)
361 }
362 Plan::CopyTo(plan::CopyToPlan {
364 select_plan,
365 desc,
366 to,
367 connection,
368 connection_id,
369 format,
370 max_file_size,
371 }) => {
372 let uri = eval_copy_to_uri(to.clone(), session, catalog.state())?;
373
374 let copy_to_ctx = CopyToContext {
376 desc: desc.clone(),
377 uri,
378 connection: connection.clone(),
379 connection_id: *connection_id,
380 format: format.clone(),
381 max_file_size: *max_file_size,
382 output_batch_count: None,
383 };
384
385 (
386 QueryPlan::CopyTo(select_plan, copy_to_ctx),
387 ExplainContext::None,
388 )
389 }
390 Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => {
391 match explainee {
393 plan::Explainee::Statement(plan::ExplaineeStatement::Select {
394 broken: false,
395 plan,
396 desc: _,
397 }) => {
398 let explain_ctx = ExplainContext::Pushdown;
399 (QueryPlan::Select(plan), explain_ctx)
400 }
401 _ => {
402 soft_panic_or_log!(
405 "unexpected EXPLAIN FILTER PUSHDOWN plan kind in frontend peek sequencing: {:?}",
406 explainee
407 );
408 debug!(
409 "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is EXPLAIN BROKEN"
410 );
411 return Ok(None);
412 }
413 }
414 }
415 Plan::SideEffectingFunc(sef_plan) => {
416 let response = self
420 .call_coordinator(|tx| Command::ExecuteSideEffectingFunc {
421 plan: sef_plan.clone(),
422 conn_id: session.conn_id().clone(),
423 current_role: session.role_metadata().current_role,
424 tx,
425 })
426 .await?;
427 return Ok(Some(response));
428 }
429 Plan::Subscribe(subscribe) => (QueryPlan::Subscribe(subscribe), ExplainContext::None),
430 _ => {
431 soft_panic_or_log!(
434 "Unexpected plan kind in frontend peek sequencing: {:?}",
435 plan
436 );
437 debug!(
438 "Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, side-effecting SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO S3"
439 );
440 return Ok(None);
441 }
442 };
443
444 let when = match query_plan {
445 QueryPlan::Select(s) => &s.when,
446 QueryPlan::CopyTo(s, _) => &s.when,
447 QueryPlan::Subscribe(s) => &s.when,
448 };
449
450 let depends_on = match query_plan {
451 QueryPlan::Select(s) => s.source.depends_on(),
452 QueryPlan::CopyTo(s, _) => s.source.depends_on(),
453 QueryPlan::Subscribe(s) => s.from.depends_on(),
454 };
455
456 let contains_temporal = match query_plan {
457 QueryPlan::Select(s) => s.source.contains_temporal(),
458 QueryPlan::CopyTo(s, _) => s.source.contains_temporal(),
459 QueryPlan::Subscribe(s) => s.from.contains_temporal(),
460 };
461
462 assert!(plan.allowed_in_read_only());
466
467 let (cluster, target_cluster_id, target_cluster_name) = {
468 let target_cluster = match session.transaction().cluster() {
469 Some(cluster_id) => TargetCluster::Transaction(cluster_id),
471 None => coord::catalog_serving::auto_run_on_catalog_server(
473 &conn_catalog,
474 session,
475 &plan,
476 ),
477 };
478 let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
479 (cluster, cluster.id, &cluster.name)
480 };
481
482 if let Some(logging_id) = &statement_logging_id {
484 self.log_set_cluster(*logging_id, target_cluster_id, target_cluster_name.clone());
485 }
486
487 coord::catalog_serving::check_cluster_restrictions(
488 target_cluster_name.as_str(),
489 &conn_catalog,
490 &plan,
491 )?;
492
493 rbac::check_plan(
494 &conn_catalog,
495 None::<fn(u32) -> Option<RoleId>>,
498 session,
499 &plan,
500 Some(target_cluster_id),
501 &resolved_ids,
502 &sql_impl_ids,
503 )?;
504
505 if let Some((_, wait_future)) =
506 coord::appends::waiting_on_startup_appends(&*catalog, session, &plan)
507 {
508 wait_future.await;
509 }
510
511 let max_query_result_size = Some(session.vars().max_query_result_size());
512
513 let compute_instance_snapshot =
518 ComputeInstanceSnapshot::new_without_collections(cluster.id());
519
520 let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config())
521 .override_from(&catalog.get_cluster(cluster.id()).config.features())
522 .override_from(
524 &catalog
525 .state()
526 .cluster_scoped_optimizer_overrides(cluster.id()),
527 )
528 .override_from(&explain_ctx);
529
530 if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
531 return Err(AdapterError::NoClusterReplicasAvailable {
532 name: cluster.name.clone(),
533 is_managed: cluster.is_managed(),
534 });
535 }
536
537 let (_, view_id) = self.transient_id_gen.allocate_id();
538 let (_, index_id) = self.transient_id_gen.allocate_id();
539
540 let target_replica_name = session.vars().cluster_replica();
541 let mut target_replica = target_replica_name
542 .map(|name| {
543 cluster
544 .replica_id(name)
545 .ok_or(AdapterError::UnknownClusterReplica {
546 cluster_name: cluster.name.clone(),
547 replica_name: name.to_string(),
548 })
549 })
550 .transpose()?;
551
552 let source_ids = depends_on;
553 let mut timeline_context = catalog.validate_timeline_context(source_ids.iter().copied())?;
557 if matches!(timeline_context, TimelineContext::TimestampIndependent) && contains_temporal {
558 timeline_context = TimelineContext::TimestampDependent;
562 }
563
564 let notices = coord::sequencer::check_log_reads(
565 &catalog,
566 cluster,
567 &source_ids,
568 &mut target_replica,
569 session.vars(),
570 )?;
571 session.add_notices(notices);
572
573 let isolation_level = session.vars().transaction_isolation().clone();
576 let timeline = Coordinator::get_timeline(&timeline_context);
577 let needs_linearized_read_ts =
578 Coordinator::needs_linearized_read_ts(&isolation_level, when);
579
580 let oracle_read_ts = match timeline {
581 Some(timeline) if needs_linearized_read_ts => {
582 let oracle = self.ensure_oracle(timeline).await?;
583 let oracle_read_ts = oracle.read_ts().await;
584 Some(oracle_read_ts)
585 }
586 Some(_) | None => None,
587 };
588
589 let vars = session.vars();
592 let real_time_recency_ts: Option<Timestamp> = if vars.real_time_recency()
593 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
594 && !session.contains_read_timestamp()
595 {
596 self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp {
598 source_ids: source_ids.clone(),
599 real_time_recency_timeout: *vars.real_time_recency_timeout(),
600 tx,
601 })
602 .await?
603 } else {
604 None
605 };
606
607 let dataflow_builder =
610 DataflowBuilder::new(catalog.state(), compute_instance_snapshot.clone());
611 let input_id_bundle = dataflow_builder.sufficient_collections(source_ids.clone());
612
613 let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when)
626 && !matches!(query_plan, QueryPlan::Subscribe { .. });
627
628 let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
630 Some(
633 determination @ TimestampDetermination {
634 timestamp_context: TimestampContext::TimelineTimestamp { .. },
635 ..
636 },
637 ) if in_immediate_multi_stmt_txn => {
638 let txn_read_holds_opt = self
646 .call_coordinator(|tx| Command::GetTransactionReadHoldsBundle {
647 conn_id: session.conn_id().clone(),
648 tx,
649 })
650 .await;
651
652 if let Some(txn_read_holds) = txn_read_holds_opt {
653 let allowed_id_bundle = txn_read_holds.id_bundle();
654 let outside = input_id_bundle.difference(&allowed_id_bundle);
655
656 if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
658 let valid_names =
659 allowed_id_bundle.resolve_names(&*catalog, session.conn_id());
660 let invalid_names = outside.resolve_names(&*catalog, session.conn_id());
661 return Err(AdapterError::RelationOutsideTimeDomain {
662 relations: invalid_names,
663 names: valid_names,
664 });
665 }
666
667 let read_holds = txn_read_holds.subset(&input_id_bundle);
669
670 (determination, read_holds)
671 } else {
672 return Err(AdapterError::Internal(
677 "Missing transaction read holds for multi-statement transaction"
678 .to_string(),
679 ));
680 }
681 }
682 _ => {
683 let timedomain_bundle;
690 let determine_bundle = if in_immediate_multi_stmt_txn {
691 timedomain_bundle = timedomain_for(
695 &*catalog,
696 &dataflow_builder,
697 &source_ids,
698 &timeline_context,
699 session.conn_id(),
700 target_cluster_id,
701 )?;
702 &timedomain_bundle
703 } else {
704 &input_id_bundle
706 };
707 let (determination, read_holds) = self
708 .frontend_determine_timestamp(
709 session,
710 determine_bundle,
711 when,
712 target_cluster_id,
713 &timeline_context,
714 oracle_read_ts,
715 real_time_recency_ts,
716 )
717 .await?;
718
719 if in_immediate_multi_stmt_txn {
723 self.call_coordinator(|tx| Command::StoreTransactionReadHolds {
724 conn_id: session.conn_id().clone(),
725 read_holds: read_holds.clone(),
726 tx,
727 })
728 .await;
729 }
730
731 (determination, read_holds)
732 }
733 };
734
735 {
736 for id in input_id_bundle.iter() {
738 let s = read_holds.storage_holds.contains_key(&id);
739 let c = read_holds
740 .compute_ids()
741 .map(|(_instance, coll)| coll)
742 .contains(&id);
743 soft_assert_or_log!(
744 s || c,
745 "missing read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
746 id,
747 in_immediate_multi_stmt_txn,
748 );
749 }
750
751 for id in input_id_bundle.storage_ids.iter() {
754 soft_assert_or_log!(
755 read_holds.storage_holds.contains_key(id),
756 "missing storage read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
757 id,
758 in_immediate_multi_stmt_txn,
759 );
760 }
761 for id in input_id_bundle
762 .compute_ids
763 .iter()
764 .flat_map(|(_instance, colls)| colls)
765 {
766 soft_assert_or_log!(
767 read_holds
768 .compute_ids()
769 .map(|(_instance, coll)| coll)
770 .contains(id),
771 "missing compute read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
772 id,
773 in_immediate_multi_stmt_txn,
774 );
775 }
776 }
777
778 let requires_linearization = (&explain_ctx).into();
791 let mut transaction_determination = determination.clone();
792 match query_plan {
793 QueryPlan::Subscribe { .. } => {
794 if when.is_transactional() {
795 session.add_transaction_ops(TransactionOps::Subscribe)?;
796 }
797 }
798 QueryPlan::Select(..) | QueryPlan::CopyTo(..) => {
799 if when.is_transactional() {
800 session.add_transaction_ops(TransactionOps::Peeks {
801 determination: transaction_determination,
802 cluster_id: target_cluster_id,
803 requires_linearization,
804 })?;
805 } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
806 transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
808 session.add_transaction_ops(TransactionOps::Peeks {
809 determination: transaction_determination,
810 cluster_id: target_cluster_id,
811 requires_linearization,
812 })?;
813 }
814 }
815 }
816
817 let stats = statistics_oracle(
820 session,
821 &source_ids,
822 &determination.timestamp_context.antichain(),
823 true,
824 catalog.system_config(),
825 &*self.storage_collections,
826 )
827 .await
828 .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
829
830 let timestamp_context = determination.timestamp_context.clone();
833 let session_meta = session.meta();
834 let now = catalog.config().now.clone();
835 let target_cluster_name = target_cluster_name.clone();
836 let needs_plan_insights = explain_ctx.needs_plan_insights();
837 let determination_for_pushdown = if matches!(explain_ctx, ExplainContext::Pushdown) {
838 Some(determination.clone())
841 } else {
842 None
843 };
844
845 let span = Span::current();
846
847 let catalog_for_insights = if needs_plan_insights {
849 Some(Arc::clone(&catalog))
850 } else {
851 None
852 };
853 let mut compute_instances = BTreeMap::new();
854 if needs_plan_insights {
855 for user_cluster in catalog.user_clusters() {
856 let snapshot = ComputeInstanceSnapshot::new_without_collections(user_cluster.id);
857 compute_instances.insert(user_cluster.name.clone(), snapshot);
858 }
859 }
860
861 let source_ids_for_closure = source_ids.clone();
862
863 let optimization_future: JoinHandle<Result<_, AdapterError>> = match query_plan {
864 QueryPlan::CopyTo(select_plan, mut copy_to_ctx) => {
865 let raw_expr = select_plan.source.clone();
866
867 let worker_counts = cluster.replicas().map(|r| {
869 let loc = &r.config.location;
870 loc.workers().unwrap_or_else(|| loc.num_processes())
871 });
872 let max_worker_count = match worker_counts.max() {
873 Some(count) => u64::cast_from(count),
874 None => {
875 return Err(AdapterError::NoClusterReplicasAvailable {
876 name: cluster.name.clone(),
877 is_managed: cluster.is_managed(),
878 });
879 }
880 };
881 copy_to_ctx.output_batch_count = Some(max_worker_count);
882
883 let mut optimizer = optimize::copy_to::Optimizer::new(
884 Arc::clone(&catalog),
885 compute_instance_snapshot,
886 view_id,
887 copy_to_ctx,
888 optimizer_config,
889 self.optimizer_metrics.clone(),
890 );
891
892 mz_ore::task::spawn_blocking(
893 || "optimize copy-to",
894 move || {
895 span.in_scope(|| {
896 let _dispatch_guard = explain_ctx.dispatch_guard();
897
898 let global_lir_plan = optimize::optimize_oneshot(
900 &mut optimizer,
901 raw_expr.clone(),
902 |local_mir_plan| {
903 local_mir_plan.resolve(
904 timestamp_context.clone(),
905 &session_meta,
906 stats,
907 )
908 },
909 )?;
910 Ok(Execution::CopyToS3 {
911 global_lir_plan,
912 source_ids: source_ids_for_closure,
913 })
914 })
915 },
916 )
917 }
918 QueryPlan::Select(select_plan) => {
919 let select_plan = select_plan.clone();
920 let raw_expr = select_plan.source.clone();
921
922 let mut optimizer = optimize::peek::Optimizer::new(
924 Arc::clone(&catalog),
925 compute_instance_snapshot,
926 select_plan.finishing.clone(),
927 view_id,
928 index_id,
929 optimizer_config,
930 self.optimizer_metrics.clone(),
931 );
932
933 mz_ore::task::spawn_blocking(
934 || "optimize peek",
935 move || {
936 span.in_scope(|| {
937 let _dispatch_guard = explain_ctx.dispatch_guard();
938
939 let global_lir_plan_result = optimize::optimize_oneshot(
944 &mut optimizer,
945 raw_expr.clone(),
946 |local_mir_plan| {
947 local_mir_plan.resolve(
948 timestamp_context.clone(),
949 &session_meta,
950 stats,
951 )
952 },
953 )
954 .map_err(AdapterError::from);
955 let optimization_finished_at = now();
956
957 let create_insights_ctx =
958 |optimizer: &optimize::peek::Optimizer,
959 is_notice: bool|
960 -> Option<Box<PlanInsightsContext>> {
961 if !needs_plan_insights {
962 return None;
963 }
964
965 let catalog = catalog_for_insights.as_ref()?;
966
967 let enable_re_optimize = if needs_plan_insights {
968 let dyncfgs = catalog.system_config().dyncfgs();
976 let opt_limit = mz_adapter_types::dyncfgs
977 ::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
978 .get(dyncfgs);
979 !(is_notice && optimizer.duration() > opt_limit)
980 } else {
981 false
982 };
983
984 Some(Box::new(PlanInsightsContext {
985 stmt: select_plan
986 .select
987 .as_deref()
988 .map(Clone::clone)
989 .map(Statement::Select),
990 raw_expr: raw_expr.clone(),
991 catalog: Arc::clone(catalog),
992 compute_instances,
993 target_instance: target_cluster_name,
994 metrics: optimizer.metrics().clone(),
995 finishing: optimizer.finishing().clone(),
996 optimizer_config: optimizer.config().clone(),
997 session: session_meta,
998 timestamp_context,
999 view_id: optimizer.select_id(),
1000 index_id: optimizer.index_id(),
1001 enable_re_optimize,
1002 }))
1003 };
1004
1005 let global_lir_plan = match global_lir_plan_result {
1006 Ok(plan) => plan,
1007 Err(err) => {
1008 let result = if let ExplainContext::Plan(explain_ctx) =
1009 explain_ctx
1010 && explain_ctx.broken
1011 {
1012 tracing::error!(
1014 "error while handling EXPLAIN statement: {}",
1015 err
1016 );
1017 Ok(Execution::ExplainPlan {
1018 df_meta: Default::default(),
1019 explain_ctx,
1020 optimizer,
1021 insights_ctx: None,
1022 })
1023 } else {
1024 Err(err)
1025 };
1026 return result;
1027 }
1028 };
1029
1030 match explain_ctx {
1031 ExplainContext::Plan(explain_ctx) => {
1032 let (_, df_meta, _) = global_lir_plan.unapply();
1033 let insights_ctx = create_insights_ctx(&optimizer, false);
1034 Ok(Execution::ExplainPlan {
1035 df_meta,
1036 explain_ctx,
1037 optimizer,
1038 insights_ctx,
1039 })
1040 }
1041 ExplainContext::None => Ok(Execution::Peek {
1042 global_lir_plan,
1043 optimization_finished_at,
1044 plan_insights_optimizer_trace: None,
1045 finishing: select_plan.finishing,
1046 copy_to: select_plan.copy_to,
1047 insights_ctx: None,
1048 }),
1049 ExplainContext::PlanInsightsNotice(optimizer_trace) => {
1050 let insights_ctx = create_insights_ctx(&optimizer, true);
1051 Ok(Execution::Peek {
1052 global_lir_plan,
1053 optimization_finished_at,
1054 plan_insights_optimizer_trace: Some(optimizer_trace),
1055 finishing: select_plan.finishing,
1056 copy_to: select_plan.copy_to,
1057 insights_ctx,
1058 })
1059 }
1060 ExplainContext::Pushdown => {
1061 let (plan, _, _) = global_lir_plan.unapply();
1062 let imports = match plan {
1063 PeekPlan::SlowPath(plan) => plan
1064 .desc
1065 .source_imports
1066 .into_iter()
1067 .filter_map(|(id, import)| {
1068 import.desc.arguments.operators.map(|mfp| (id, mfp))
1069 })
1070 .collect(),
1071 PeekPlan::FastPath(_) => {
1072 std::collections::BTreeMap::default()
1073 }
1074 };
1075 Ok(Execution::ExplainPushdown {
1076 imports,
1077 determination: determination_for_pushdown
1078 .expect("it's present for the ExplainPushdown case"),
1079 })
1080 }
1081 }
1082 })
1083 },
1084 )
1085 }
1086 QueryPlan::Subscribe(plan) => {
1087 let plan = plan.clone();
1088 let catalog: Arc<Catalog> = Arc::clone(&catalog);
1089 let debug_name = format!("subscribe-{}", index_id);
1090 let mut optimizer = optimize::subscribe::Optimizer::new(
1091 catalog,
1092 compute_instance_snapshot.clone(),
1093 view_id,
1094 index_id,
1095 plan.with_snapshot,
1096 plan.up_to,
1097 debug_name,
1098 optimizer_config,
1099 self.optimizer_metrics.clone(),
1100 );
1101 mz_ore::task::spawn_blocking(
1102 || "optimize subscribe",
1103 move || {
1104 span.in_scope(|| {
1105 let _dispatch_guard = explain_ctx.dispatch_guard();
1106
1107 let global_mir_plan = optimizer.catch_unwind_optimize(plan.clone())?;
1108 let as_of = timestamp_context.timestamp_or_default();
1109
1110 if let Some(up_to) = optimizer.up_to() {
1111 if as_of > up_to {
1112 return Err(AdapterError::AbsurdSubscribeBounds {
1113 as_of,
1114 up_to,
1115 });
1116 }
1117 }
1118 let local_mir_plan =
1119 global_mir_plan.resolve(Antichain::from_elem(as_of));
1120
1121 let global_lir_plan =
1122 optimizer.catch_unwind_optimize(local_mir_plan)?;
1123 let optimization_finished_at = now();
1124
1125 let (df_desc, df_meta) = global_lir_plan.unapply();
1126 Ok(Execution::Subscribe {
1127 subscribe_plan: plan,
1128 df_desc,
1129 df_meta,
1130 optimization_finished_at,
1131 })
1132 })
1133 },
1134 )
1135 }
1136 };
1137
1138 let mut optimization_timeout = *session.vars().statement_timeout();
1139 if optimization_timeout == Duration::ZERO {
1141 optimization_timeout = Duration::MAX;
1142 }
1143 let optimization_result =
1144 match tokio::time::timeout(optimization_timeout, optimization_future).await {
1149 Ok(Ok(result)) => result,
1150 Ok(Err(AdapterError::Optimizer(err))) => {
1151 return Err(AdapterError::Internal(format!(
1152 "internal error in optimizer: {}",
1153 err
1154 )));
1155 }
1156 Ok(Err(err)) => {
1157 return Err(err);
1158 }
1159 Err(_elapsed) => {
1160 warn!("optimize peek timed out after {:?}", optimization_timeout);
1161 return Err(AdapterError::StatementTimeout);
1162 }
1163 };
1164
1165 if let Some(logging_id) = &statement_logging_id {
1167 self.log_lifecycle_event(*logging_id, StatementLifecycleEvent::OptimizationFinished);
1168 }
1169
1170 Self::assert_read_holds_correct(
1172 &read_holds,
1173 &optimization_result,
1174 &determination,
1175 target_cluster_id,
1176 in_immediate_multi_stmt_txn,
1177 );
1178
1179 match optimization_result {
1181 Execution::ExplainPlan {
1182 df_meta,
1183 explain_ctx,
1184 optimizer,
1185 insights_ctx,
1186 } => {
1187 let rows = coord::sequencer::explain_plan_inner(
1188 session,
1189 &catalog,
1190 df_meta,
1191 explain_ctx,
1192 optimizer,
1193 insights_ctx,
1194 )
1195 .await?;
1196
1197 Ok(Some(ExecuteResponse::SendingRowsImmediate {
1198 rows: Box::new(rows.into_row_iter()),
1199 }))
1200 }
1201 Execution::ExplainPushdown {
1202 imports,
1203 determination,
1204 } => {
1205 let as_of = determination.timestamp_context.antichain();
1208 let mz_now = determination
1209 .timestamp_context
1210 .timestamp()
1211 .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1212 .unwrap_or_else(ResultSpec::value_all);
1213
1214 Ok(Some(
1215 coord::sequencer::explain_pushdown_future_inner(
1216 session,
1217 &*catalog,
1218 &self.storage_collections,
1219 as_of,
1220 mz_now,
1221 imports,
1222 )
1223 .await
1224 .await?,
1225 ))
1226 }
1227 Execution::Peek {
1228 global_lir_plan,
1229 optimization_finished_at: _optimization_finished_at,
1230 plan_insights_optimizer_trace,
1231 finishing,
1232 copy_to,
1233 insights_ctx,
1234 } => {
1235 let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
1240
1241 coord::sequencer::emit_optimizer_notices(
1242 &*catalog,
1243 session,
1244 &df_meta.optimizer_notices,
1245 );
1246
1247 if let Some(trace) = plan_insights_optimizer_trace {
1249 let target_cluster = catalog.get_cluster(target_cluster_id);
1250 let features = OptimizerFeatures::from(catalog.system_config())
1251 .override_from(&target_cluster.config.features())
1252 .override_from(
1255 &catalog
1256 .state()
1257 .cluster_scoped_optimizer_overrides(target_cluster_id),
1258 );
1259 let insights = trace
1260 .into_plan_insights(
1261 &features,
1262 &catalog.for_session(session),
1263 Some(finishing.clone()),
1264 Some(target_cluster),
1265 df_meta.clone(),
1266 insights_ctx,
1267 )
1268 .await?;
1269 session.add_notice(AdapterNotice::PlanInsights(insights));
1270 }
1271
1272 let watch_set = statement_logging_id.map(|logging_id| {
1275 WatchSetCreation::new(
1276 logging_id,
1277 catalog.state(),
1278 &input_id_bundle,
1279 determination.timestamp_context.timestamp_or_default(),
1280 )
1281 });
1282
1283 let max_result_size = catalog.system_config().max_result_size();
1284
1285 let determination_for_notice = if session.vars().emit_timestamp_notice() {
1288 Some(determination.clone())
1289 } else {
1290 None
1291 };
1292
1293 let response = match peek_plan {
1294 PeekPlan::FastPath(fast_path_plan) => {
1295 if let Some(logging_id) = &statement_logging_id {
1296 if !matches!(fast_path_plan, FastPathPlan::Constant(..)) {
1310 self.log_set_timestamp(
1311 *logging_id,
1312 determination.timestamp_context.timestamp_or_default(),
1313 );
1314 }
1315 }
1316
1317 let row_set_finishing_seconds =
1318 session.metrics().row_set_finishing_seconds().clone();
1319
1320 let peek_stash_read_batch_size_bytes =
1321 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
1322 .get(catalog.system_config().dyncfgs());
1323 let peek_stash_read_memory_budget_bytes =
1324 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
1325 .get(catalog.system_config().dyncfgs());
1326
1327 self.implement_fast_path_peek_plan(
1328 fast_path_plan,
1329 determination.timestamp_context.timestamp_or_default(),
1330 finishing,
1331 target_cluster_id,
1332 target_replica,
1333 typ,
1334 max_result_size,
1335 max_query_result_size,
1336 row_set_finishing_seconds,
1337 read_holds,
1338 peek_stash_read_batch_size_bytes,
1339 peek_stash_read_memory_budget_bytes,
1340 session.conn_id().clone(),
1341 source_ids,
1342 watch_set,
1343 )
1344 .await?
1345 }
1346 PeekPlan::SlowPath(dataflow_plan) => {
1347 if let Some(logging_id) = &statement_logging_id {
1348 self.log_set_transient_index_id(*logging_id, dataflow_plan.id);
1349 }
1350
1351 self.call_coordinator(|tx| Command::ExecuteSlowPathPeek {
1352 dataflow_plan: Box::new(dataflow_plan),
1353 determination,
1354 finishing,
1355 compute_instance: target_cluster_id,
1356 target_replica,
1357 intermediate_result_type: typ,
1358 source_ids,
1359 conn_id: session.conn_id().clone(),
1360 max_result_size,
1361 max_query_result_size,
1362 watch_set,
1363 tx,
1364 })
1365 .await?
1366 }
1367 };
1368
1369 if let Some(determination) = determination_for_notice {
1371 let explanation = self
1372 .call_coordinator(|tx| Command::ExplainTimestamp {
1373 conn_id: session.conn_id().clone(),
1374 session_wall_time: session.pcx().wall_time,
1375 cluster_id: target_cluster_id,
1376 id_bundle: input_id_bundle.clone(),
1377 determination,
1378 tx,
1379 })
1380 .await;
1381 session.add_notice(AdapterNotice::QueryTimestamp { explanation });
1382 }
1383
1384 Ok(Some(match copy_to {
1385 None => response,
1386 Some(format) => ExecuteResponse::CopyTo {
1388 format,
1389 resp: Box::new(response),
1390 },
1391 }))
1392 }
1393 Execution::Subscribe {
1394 subscribe_plan,
1395 df_desc,
1396 df_meta,
1397 optimization_finished_at: _optimization_finished_at,
1398 } => {
1399 if df_desc.as_of.as_ref().expect("as of set") == &df_desc.until {
1400 session.add_notice(AdapterNotice::EqualSubscribeBounds {
1401 bound: *df_desc.until.as_option().expect("as of set"),
1402 });
1403 }
1404 coord::sequencer::emit_optimizer_notices(
1405 &*catalog,
1406 session,
1407 &df_meta.optimizer_notices,
1408 );
1409
1410 let response = self
1411 .call_coordinator(|tx| Command::ExecuteSubscribe {
1412 df_desc,
1413 dependency_ids: subscribe_plan.from.depends_on(),
1414 cluster_id: target_cluster_id,
1415 replica_id: target_replica,
1416 conn_id: session.conn_id().clone(),
1417 session_uuid: session.uuid(),
1418 read_holds,
1419 plan: subscribe_plan,
1420 statement_logging_id,
1421 tx,
1422 })
1423 .await?;
1424 Ok(Some(response))
1425 }
1426 Execution::CopyToS3 {
1427 global_lir_plan,
1428 source_ids,
1429 } => {
1430 let (df_desc, df_meta) = global_lir_plan.unapply();
1431
1432 coord::sequencer::emit_optimizer_notices(
1433 &*catalog,
1434 session,
1435 &df_meta.optimizer_notices,
1436 );
1437
1438 let sink_id = df_desc.sink_id();
1440 let sinks = &df_desc.sink_exports;
1441 if sinks.len() != 1 {
1442 return Err(AdapterError::Internal(
1443 "expected exactly one copy to s3 sink".into(),
1444 ));
1445 }
1446 let (_, sink_desc) = sinks
1447 .first_key_value()
1448 .expect("known to be exactly one copy to s3 sink");
1449 let s3_sink_connection = match &sink_desc.connection {
1450 mz_compute_types::sinks::ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1451 conn.clone()
1452 }
1453 _ => {
1454 return Err(AdapterError::Internal(
1455 "expected copy to s3 oneshot sink".into(),
1456 ));
1457 }
1458 };
1459
1460 self.call_coordinator(|tx| Command::CopyToPreflight {
1463 s3_sink_connection,
1464 sink_id,
1465 tx,
1466 })
1467 .await?;
1468
1469 let watch_set = statement_logging_id.map(|logging_id| {
1471 WatchSetCreation::new(
1472 logging_id,
1473 catalog.state(),
1474 &input_id_bundle,
1475 determination.timestamp_context.timestamp_or_default(),
1476 )
1477 });
1478
1479 let response = self
1480 .call_coordinator(|tx| Command::ExecuteCopyTo {
1481 df_desc: Box::new(df_desc),
1482 compute_instance: target_cluster_id,
1483 target_replica,
1484 source_ids,
1485 conn_id: session.conn_id().clone(),
1486 watch_set,
1487 tx,
1488 })
1489 .await?;
1490
1491 Ok(Some(response))
1492 }
1493 }
1494 }
1495
1496 pub(crate) async fn frontend_determine_timestamp(
1503 &mut self,
1504 session: &Session,
1505 id_bundle: &CollectionIdBundle,
1506 when: &QueryWhen,
1507 compute_instance: ComputeInstanceId,
1508 timeline_context: &TimelineContext,
1509 oracle_read_ts: Option<Timestamp>,
1510 real_time_recency_ts: Option<Timestamp>,
1511 ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> {
1512 let isolation_level = session.vars().transaction_isolation();
1515
1516 let (read_holds, upper) = self
1517 .acquire_read_holds_and_least_valid_write(id_bundle)
1518 .await
1519 .map_err(|err| {
1520 AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1521 err,
1522 compute_instance,
1523 )
1524 })?;
1525 let (det, read_holds) = <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1526 session,
1527 id_bundle,
1528 when,
1529 timeline_context,
1530 oracle_read_ts,
1531 real_time_recency_ts,
1532 isolation_level,
1533 read_holds,
1534 upper.clone(),
1535 )?;
1536
1537 session
1538 .metrics()
1539 .determine_timestamp(&[
1540 match det.respond_immediately() {
1541 true => "true",
1542 false => "false",
1543 },
1544 isolation_level.as_variant_str(),
1545 &compute_instance.to_string(),
1546 ])
1547 .inc();
1548 if !det.respond_immediately()
1549 && isolation_level == &IsolationLevel::StrictSerializable
1550 && real_time_recency_ts.is_none()
1551 {
1552 if let Some(strict) = det.timestamp_context.timestamp() {
1554 let (serializable_det, _tmp_read_holds) =
1555 <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1556 session,
1557 id_bundle,
1558 when,
1559 timeline_context,
1560 oracle_read_ts,
1561 real_time_recency_ts,
1562 &IsolationLevel::Serializable,
1563 read_holds.clone(),
1564 upper.clone(),
1565 )?;
1566 if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
1567 session
1568 .metrics()
1569 .timestamp_difference_for_strict_serializable_ms(&[compute_instance
1570 .to_string()
1571 .as_ref()])
1572 .observe(f64::cast_lossy(u64::from(
1573 strict.saturating_sub(*serializable),
1574 )));
1575 }
1576 }
1577 }
1578 if !det.respond_immediately()
1579 && isolation_level.is_bounded_staleness()
1580 && real_time_recency_ts.is_none()
1581 {
1582 if let Some(bs_ts) = det.timestamp_context.timestamp() {
1584 let (serializable_det, _tmp_read_holds) =
1585 <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1586 session,
1587 id_bundle,
1588 when,
1589 timeline_context,
1590 oracle_read_ts,
1591 real_time_recency_ts,
1592 &IsolationLevel::Serializable,
1593 read_holds.clone(),
1594 upper,
1595 )?;
1596 if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
1597 session
1598 .metrics()
1599 .timestamp_difference_for_bounded_staleness_ms(&[compute_instance
1600 .to_string()
1601 .as_ref()])
1602 .observe(f64::cast_lossy(u64::from(
1603 serializable.saturating_sub(*bs_ts),
1604 )));
1605 }
1606 }
1607 }
1608
1609 Ok((det, read_holds))
1610 }
1611
1612 fn assert_read_holds_correct(
1613 read_holds: &ReadHolds,
1614 execution: &Execution,
1615 determination: &TimestampDetermination,
1616 target_cluster_id: ClusterId,
1617 in_immediate_multi_stmt_txn: bool,
1618 ) {
1619 let (source_imports, index_imports, as_of, execution_name): (
1621 Vec<GlobalId>,
1622 Vec<GlobalId>,
1623 Timestamp,
1624 &str,
1625 ) = match execution {
1626 Execution::Peek {
1627 global_lir_plan, ..
1628 } => match global_lir_plan.peek_plan() {
1629 PeekPlan::FastPath(fast_path_plan) => {
1630 let (sources, indexes) = match fast_path_plan {
1631 FastPathPlan::Constant(..) => (vec![], vec![]),
1632 FastPathPlan::PeekExisting(_coll_id, idx_id, ..) => (vec![], vec![*idx_id]),
1633 FastPathPlan::PeekPersist(global_id, ..) => (vec![*global_id], vec![]),
1634 };
1635 (
1636 sources,
1637 indexes,
1638 determination.timestamp_context.timestamp_or_default(),
1639 "FastPath",
1640 )
1641 }
1642 PeekPlan::SlowPath(dataflow_plan) => {
1643 let as_of = dataflow_plan
1644 .desc
1645 .as_of
1646 .clone()
1647 .expect("dataflow has an as_of")
1648 .into_element();
1649 (
1650 dataflow_plan.desc.source_imports.keys().cloned().collect(),
1651 dataflow_plan.desc.index_imports.keys().cloned().collect(),
1652 as_of,
1653 "SlowPath",
1654 )
1655 }
1656 },
1657 Execution::CopyToS3 {
1658 global_lir_plan, ..
1659 } => {
1660 let df_desc = global_lir_plan.df_desc();
1661 let as_of = df_desc
1662 .as_of
1663 .clone()
1664 .expect("dataflow has an as_of")
1665 .into_element();
1666 (
1667 df_desc.source_imports.keys().cloned().collect(),
1668 df_desc.index_imports.keys().cloned().collect(),
1669 as_of,
1670 "CopyToS3",
1671 )
1672 }
1673 Execution::ExplainPlan { .. } | Execution::ExplainPushdown { .. } => {
1674 return;
1676 }
1677 Execution::Subscribe { df_desc, .. } => {
1678 let as_of = df_desc
1679 .as_of
1680 .clone()
1681 .expect("dataflow has an as_of")
1682 .into_element();
1683 (
1684 df_desc.source_imports.keys().cloned().collect(),
1685 df_desc.index_imports.keys().cloned().collect(),
1686 as_of,
1687 "Subscribe",
1688 )
1689 }
1690 };
1691
1692 for id in source_imports.iter() {
1694 soft_assert_or_log!(
1695 read_holds.storage_holds.contains_key(id),
1696 "[{}] missing read hold for the source import {}; (in_immediate_multi_stmt_txn: {})",
1697 execution_name,
1698 id,
1699 in_immediate_multi_stmt_txn,
1700 );
1701 }
1702 for id in index_imports.iter() {
1703 soft_assert_or_log!(
1704 read_holds
1705 .compute_ids()
1706 .map(|(_instance, coll)| coll)
1707 .contains(id),
1708 "[{}] missing read hold for the index import {}; (in_immediate_multi_stmt_txn: {})",
1709 execution_name,
1710 id,
1711 in_immediate_multi_stmt_txn,
1712 );
1713 }
1714
1715 for (id, h) in read_holds.storage_holds.iter() {
1717 soft_assert_or_log!(
1718 h.since().less_equal(&as_of),
1719 "[{}] storage read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1720 execution_name,
1721 h.since(),
1722 id,
1723 as_of,
1724 determination,
1725 in_immediate_multi_stmt_txn,
1726 );
1727 }
1728 for ((instance, id), h) in read_holds.compute_holds.iter() {
1729 soft_assert_eq_or_log!(
1730 *instance,
1731 target_cluster_id,
1732 "[{}] the read hold on {} is on the wrong cluster; (in_immediate_multi_stmt_txn: {})",
1733 execution_name,
1734 id,
1735 in_immediate_multi_stmt_txn,
1736 );
1737 soft_assert_or_log!(
1738 h.since().less_equal(&as_of),
1739 "[{}] compute read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1740 execution_name,
1741 h.since(),
1742 id,
1743 as_of,
1744 determination,
1745 in_immediate_multi_stmt_txn,
1746 );
1747 }
1748 }
1749}
1750
1751enum Execution {
1753 Peek {
1754 global_lir_plan: optimize::peek::GlobalLirPlan,
1755 optimization_finished_at: EpochMillis,
1756 plan_insights_optimizer_trace: Option<OptimizerTrace>,
1757 finishing: RowSetFinishing,
1758 copy_to: Option<plan::CopyFormat>,
1759 insights_ctx: Option<Box<PlanInsightsContext>>,
1760 },
1761 Subscribe {
1762 subscribe_plan: SubscribePlan,
1763 df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1764 df_meta: DataflowMetainfo,
1765 optimization_finished_at: EpochMillis,
1766 },
1767 CopyToS3 {
1768 global_lir_plan: optimize::copy_to::GlobalLirPlan,
1769 source_ids: BTreeSet<GlobalId>,
1770 },
1771 ExplainPlan {
1772 df_meta: DataflowMetainfo,
1773 explain_ctx: ExplainPlanContext,
1774 optimizer: optimize::peek::Optimizer,
1775 insights_ctx: Option<Box<PlanInsightsContext>>,
1776 },
1777 ExplainPushdown {
1778 imports: BTreeMap<GlobalId, mz_expr::MapFilterProject>,
1779 determination: TimestampDetermination,
1780 },
1781}