1use std::collections::{BTreeMap, BTreeSet};
11use std::sync::Arc;
12
13use itertools::Either;
14use mz_adapter_types::dyncfgs::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION;
15use mz_compute_types::sinks::ComputeSinkConnection;
16use mz_controller_types::ClusterId;
17use mz_expr::{CollectionPlan, ResultSpec};
18use mz_ore::cast::CastFrom;
19use mz_ore::instrument;
20use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
21use mz_repr::{Datum, GlobalId, Timestamp};
22use mz_sql::ast::{ExplainStage, Statement};
23use mz_sql::catalog::CatalogCluster;
24use mz_sql::plan::QueryWhen;
26use mz_sql::plan::{self};
27use mz_sql::session::metadata::SessionMetadata;
28use mz_transform::EmptyStatisticsOracle;
29use tokio::sync::oneshot;
30use tracing::warn;
31use tracing::{Instrument, Span};
32
33use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
34use crate::command::ExecuteResponse;
35use crate::coord::id_bundle::CollectionIdBundle;
36use crate::coord::peek::{self, PeekDataflowPlan, PeekPlan, PlannedPeek};
37use crate::coord::sequencer::inner::return_if_err;
38use crate::coord::sequencer::{check_log_reads, emit_optimizer_notices, eval_copy_to_uri};
39use crate::coord::timeline::{TimelineContext, timedomain_for};
40use crate::coord::timestamp_selection::{
41 TimestampContext, TimestampDetermination, TimestampProvider,
42};
43use crate::coord::{
44 Coordinator, CopyToContext, ExecuteContext, ExplainContext, ExplainPlanContext, Message,
45 PeekStage, PeekStageCopyTo, PeekStageExplainPlan, PeekStageExplainPushdown, PeekStageFinish,
46 PeekStageLinearizeTimestamp, PeekStageOptimize, PeekStageRealTimeRecency,
47 PeekStageTimestampReadHold, PlanValidity, StageResult, Staged, TargetCluster,
48};
49use crate::error::AdapterError;
50use crate::explain::insights::PlanInsightsContext;
51use crate::explain::optimizer_trace::OptimizerTrace;
52use crate::notice::AdapterNotice;
53use crate::optimize::{self, Optimize};
54use crate::session::{RequireLinearization, Session, TransactionOps, TransactionStatus};
55use crate::statement_logging::StatementLifecycleEvent;
56use crate::statement_logging::WatchSetCreation;
57
58impl Staged for PeekStage {
59 type Ctx = ExecuteContext;
60
61 fn validity(&mut self) -> &mut PlanValidity {
62 match self {
63 PeekStage::LinearizeTimestamp(stage) => &mut stage.validity,
64 PeekStage::RealTimeRecency(stage) => &mut stage.validity,
65 PeekStage::TimestampReadHold(stage) => &mut stage.validity,
66 PeekStage::Optimize(stage) => &mut stage.validity,
67 PeekStage::Finish(stage) => &mut stage.validity,
68 PeekStage::ExplainPlan(stage) => &mut stage.validity,
69 PeekStage::ExplainPushdown(stage) => &mut stage.validity,
70 PeekStage::CopyToPreflight(stage) => &mut stage.validity,
71 PeekStage::CopyToDataflow(stage) => &mut stage.validity,
72 }
73 }
74
75 async fn stage(
76 self,
77 coord: &mut Coordinator,
78 ctx: &mut ExecuteContext,
79 ) -> Result<StageResult<Box<Self>>, AdapterError> {
80 match self {
81 PeekStage::LinearizeTimestamp(stage) => {
82 coord.peek_linearize_timestamp(ctx.session(), stage).await
83 }
84 PeekStage::RealTimeRecency(stage) => {
85 coord.peek_real_time_recency(ctx.session(), stage).await
86 }
87 PeekStage::TimestampReadHold(stage) => {
88 coord.peek_timestamp_read_hold(ctx.session_mut(), stage)
89 }
90 PeekStage::Optimize(stage) => coord.peek_optimize(ctx.session(), stage).await,
91 PeekStage::Finish(stage) => coord.peek_finish(ctx, stage).await,
92 PeekStage::ExplainPlan(stage) => coord.peek_explain_plan(ctx.session(), stage).await,
93 PeekStage::ExplainPushdown(stage) => {
94 coord.peek_explain_pushdown(ctx.session(), stage).await
95 }
96 PeekStage::CopyToPreflight(stage) => coord.peek_copy_to_preflight(stage).await,
97 PeekStage::CopyToDataflow(stage) => coord.peek_copy_to_dataflow(ctx, stage).await,
98 }
99 }
100
101 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
102 Message::PeekStageReady {
103 ctx,
104 span,
105 stage: self,
106 }
107 }
108
109 fn cancel_enabled(&self) -> bool {
110 true
111 }
112}
113
114impl Coordinator {
115 #[instrument]
122 pub(crate) async fn sequence_peek(
123 &mut self,
124 ctx: ExecuteContext,
125 plan: plan::SelectPlan,
126 target_cluster: TargetCluster,
127 max_query_result_size: Option<u64>,
128 ) {
129 let explain_ctx = if ctx.session().vars().emit_plan_insights_notice() {
130 let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
131 ExplainContext::PlanInsightsNotice(optimizer_trace)
132 } else {
133 ExplainContext::None
134 };
135
136 let stage = return_if_err!(
137 self.peek_validate(
138 ctx.session(),
139 plan,
140 target_cluster,
141 None,
142 explain_ctx,
143 max_query_result_size
144 ),
145 ctx
146 );
147 self.sequence_staged(ctx, Span::current(), stage).await;
148 }
149
150 #[instrument]
151 pub(crate) async fn sequence_copy_to(
152 &mut self,
153 ctx: ExecuteContext,
154 plan::CopyToPlan {
155 select_plan,
156 desc,
157 to,
158 connection,
159 connection_id,
160 format,
161 max_file_size,
162 }: plan::CopyToPlan,
163 target_cluster: TargetCluster,
164 ) {
165 let uri = return_if_err!(
166 eval_copy_to_uri(to, ctx.session(), self.catalog().state()),
167 ctx
168 );
169
170 let stage = return_if_err!(
171 self.peek_validate(
172 ctx.session(),
173 select_plan,
174 target_cluster,
175 Some(CopyToContext {
176 desc,
177 uri,
178 connection,
179 connection_id,
180 format,
181 max_file_size,
182 output_batch_count: None,
184 }),
185 ExplainContext::None,
186 Some(ctx.session().vars().max_query_result_size()),
187 ),
188 ctx
189 );
190 self.sequence_staged(ctx, Span::current(), stage).await;
191 }
192
193 #[instrument]
194 pub(crate) async fn explain_peek(
195 &mut self,
196 ctx: ExecuteContext,
197 plan::ExplainPlanPlan {
198 stage,
199 format,
200 config,
201 explainee,
202 }: plan::ExplainPlanPlan,
203 target_cluster: TargetCluster,
204 ) {
205 let plan::Explainee::Statement(stmt) = explainee else {
206 unreachable!()
209 };
210 let plan::ExplaineeStatement::Select { broken, plan, desc } = stmt else {
211 unreachable!()
214 };
215
216 let optimizer_trace = OptimizerTrace::new(stage.paths());
219
220 let stage = return_if_err!(
221 self.peek_validate(
222 ctx.session(),
223 plan,
224 target_cluster,
225 None,
226 ExplainContext::Plan(ExplainPlanContext {
227 broken,
228 config,
229 format,
230 stage,
231 replan: None,
232 desc: Some(desc),
233 optimizer_trace,
234 }),
235 Some(ctx.session().vars().max_query_result_size()),
236 ),
237 ctx
238 );
239 self.sequence_staged(ctx, Span::current(), stage).await;
240 }
241
242 #[instrument]
244 pub fn peek_validate(
245 &self,
246 session: &Session,
247 plan: mz_sql::plan::SelectPlan,
248 target_cluster: TargetCluster,
249 copy_to_ctx: Option<CopyToContext>,
250 explain_ctx: ExplainContext,
251 max_query_result_size: Option<u64>,
252 ) -> Result<PeekStage, AdapterError> {
253 let catalog = self.owned_catalog();
255 let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
256 let compute_instance = self
257 .instance_snapshot(cluster.id())
258 .expect("compute instance does not exist");
259 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
260 .override_from(&self.catalog.get_cluster(cluster.id()).config.features())
261 .override_from(&explain_ctx);
262
263 if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
264 return Err(AdapterError::NoClusterReplicasAvailable {
265 name: cluster.name.clone(),
266 is_managed: cluster.is_managed(),
267 });
268 }
269
270 let optimizer = match copy_to_ctx {
271 None => {
272 let (_, view_id) = self.allocate_transient_id();
274 let (_, index_id) = self.allocate_transient_id();
275
276 Either::Left(optimize::peek::Optimizer::new(
278 Arc::clone(&catalog),
279 compute_instance,
280 plan.finishing.clone(),
281 view_id,
282 index_id,
283 optimizer_config,
284 self.optimizer_metrics(),
285 ))
286 }
287 Some(mut copy_to_ctx) => {
288 let worker_counts = cluster.replicas().map(|r| {
292 let loc = &r.config.location;
293 loc.workers().unwrap_or_else(|| loc.num_processes())
294 });
295 let max_worker_count = match worker_counts.max() {
296 Some(count) => u64::cast_from(count),
297 None => {
298 return Err(AdapterError::NoClusterReplicasAvailable {
299 name: cluster.name.clone(),
300 is_managed: cluster.is_managed(),
301 });
302 }
303 };
304 copy_to_ctx.output_batch_count = Some(max_worker_count);
305 let (_, view_id) = self.allocate_transient_id();
306 Either::Right(optimize::copy_to::Optimizer::new(
308 Arc::clone(&catalog),
309 compute_instance,
310 view_id,
311 copy_to_ctx,
312 optimizer_config,
313 self.optimizer_metrics(),
314 ))
315 }
316 };
317
318 let target_replica_name = session.vars().cluster_replica();
319 let mut target_replica = target_replica_name
320 .map(|name| {
321 cluster
322 .replica_id(name)
323 .ok_or(AdapterError::UnknownClusterReplica {
324 cluster_name: cluster.name.clone(),
325 replica_name: name.to_string(),
326 })
327 })
328 .transpose()?;
329
330 let source_ids = plan.source.depends_on();
331 let mut timeline_context = self
332 .catalog()
333 .validate_timeline_context(source_ids.iter().copied())?;
334 if matches!(timeline_context, TimelineContext::TimestampIndependent)
335 && plan.source.contains_temporal()?
336 {
337 timeline_context = TimelineContext::TimestampDependent;
341 }
342
343 let notices = check_log_reads(
344 &catalog,
345 cluster,
346 &source_ids,
347 &mut target_replica,
348 session.vars(),
349 )?;
350 session.add_notices(notices);
351
352 let dependencies = source_ids
353 .iter()
354 .map(|id| self.catalog.resolve_item_id(id))
355 .collect();
356 let validity = PlanValidity::new(
357 catalog.transient_revision(),
358 dependencies,
359 Some(cluster.id()),
360 target_replica,
361 session.role_metadata().clone(),
362 );
363
364 Ok(PeekStage::LinearizeTimestamp(PeekStageLinearizeTimestamp {
365 validity,
366 plan,
367 max_query_result_size,
368 source_ids,
369 target_replica,
370 timeline_context,
371 optimizer,
372 explain_ctx,
373 }))
374 }
375
376 #[instrument]
378 async fn peek_linearize_timestamp(
379 &self,
380 session: &Session,
381 PeekStageLinearizeTimestamp {
382 validity,
383 source_ids,
384 plan,
385 max_query_result_size,
386 target_replica,
387 timeline_context,
388 optimizer,
389 explain_ctx,
390 }: PeekStageLinearizeTimestamp,
391 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
392 let isolation_level = session.vars().transaction_isolation().clone();
393 let timeline = Coordinator::get_timeline(&timeline_context);
394 let needs_linearized_read_ts =
395 Coordinator::needs_linearized_read_ts(&isolation_level, &plan.when);
396
397 let build_stage = move |oracle_read_ts: Option<Timestamp>| PeekStageRealTimeRecency {
398 validity,
399 plan,
400 max_query_result_size,
401 source_ids,
402 target_replica,
403 timeline_context,
404 oracle_read_ts,
405 optimizer,
406 explain_ctx,
407 };
408
409 match timeline {
410 Some(timeline) if needs_linearized_read_ts => {
411 let oracle = self.get_timestamp_oracle(&timeline);
412
413 let span = Span::current();
417 Ok(StageResult::Handle(mz_ore::task::spawn(
418 || "linearize timestamp",
419 async move {
420 let oracle_read_ts = oracle.read_ts().await;
421 let stage = build_stage(Some(oracle_read_ts));
422 let stage = PeekStage::RealTimeRecency(stage);
423 Ok(Box::new(stage))
424 }
425 .instrument(span),
426 )))
427 }
428 Some(_) | None => {
429 let stage = build_stage(None);
430 let stage = PeekStage::RealTimeRecency(stage);
431 Ok(StageResult::Immediate(Box::new(stage)))
432 }
433 }
434 }
435
436 #[instrument]
438 fn peek_timestamp_read_hold(
439 &mut self,
440 session: &mut Session,
441 PeekStageTimestampReadHold {
442 mut validity,
443 plan,
444 max_query_result_size,
445 source_ids,
446 target_replica,
447 timeline_context,
448 oracle_read_ts,
449 real_time_recency_ts,
450 optimizer,
451 explain_ctx,
452 }: PeekStageTimestampReadHold,
453 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
454 let cluster_id = match optimizer.as_ref() {
455 Either::Left(optimizer) => optimizer.cluster_id(),
456 Either::Right(optimizer) => optimizer.cluster_id(),
457 };
458 let id_bundle = self
459 .dataflow_builder(cluster_id)
460 .sufficient_collections(source_ids.iter().copied());
461
462 let item_ids = id_bundle
465 .iter()
466 .map(|id| self.catalog().resolve_item_id(&id));
467 validity.extend_dependencies(item_ids);
468
469 let determination = self.sequence_peek_timestamp(
470 session,
471 &plan.when,
472 cluster_id,
473 timeline_context,
474 oracle_read_ts,
475 &id_bundle,
476 &source_ids,
477 real_time_recency_ts,
478 (&explain_ctx).into(),
479 )?;
480
481 let stage = PeekStage::Optimize(PeekStageOptimize {
482 validity,
483 plan,
484 max_query_result_size,
485 source_ids,
486 id_bundle,
487 target_replica,
488 determination,
489 optimizer,
490 explain_ctx,
491 });
492 Ok(StageResult::Immediate(Box::new(stage)))
493 }
494
495 #[instrument]
496 async fn peek_optimize(
497 &self,
498 session: &Session,
499 PeekStageOptimize {
500 validity,
501 plan,
502 max_query_result_size,
503 source_ids,
504 id_bundle,
505 target_replica,
506 determination,
507 mut optimizer,
508 explain_ctx,
509 }: PeekStageOptimize,
510 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
511 let timestamp_context = determination.timestamp_context.clone();
514 let stats = self
515 .statistics_oracle(session, &source_ids, ×tamp_context.antichain(), true)
516 .await
517 .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
518 let session = session.meta();
519 let now = self.catalog().config().now.clone();
520 let catalog = self.owned_catalog();
521 let mut compute_instances = BTreeMap::new();
522 if explain_ctx.needs_plan_insights() {
523 for cluster in self.catalog().user_clusters() {
526 let snapshot = self.instance_snapshot(cluster.id).expect("must exist");
527 compute_instances.insert(cluster.name.clone(), snapshot);
528 }
529 }
530
531 let span = Span::current();
532 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
533 || "optimize peek",
534 move || {
535 span.in_scope(|| {
536 let pipeline = || -> Result<
537 Either<
538 optimize::peek::GlobalLirPlan,
539 optimize::copy_to::GlobalLirPlan,
540 >,
541 AdapterError,
542 > {
543 let _dispatch_guard = explain_ctx.dispatch_guard();
544
545 let raw_expr = plan.source.clone();
546
547 match optimizer.as_mut() {
548 Either::Left(optimizer) => {
550 let local_mir_plan =
552 optimizer.catch_unwind_optimize(raw_expr)?;
553 let local_mir_plan = local_mir_plan.resolve(
555 timestamp_context.clone(),
556 &session,
557 stats,
558 );
559 let global_lir_plan =
561 optimizer.catch_unwind_optimize(local_mir_plan)?;
562
563 Ok(Either::Left(global_lir_plan))
564 }
565 Either::Right(optimizer) => {
567 let local_mir_plan =
569 optimizer.catch_unwind_optimize(raw_expr)?;
570 let local_mir_plan = local_mir_plan.resolve(
572 timestamp_context.clone(),
573 &session,
574 stats,
575 );
576 let global_lir_plan =
578 optimizer.catch_unwind_optimize(local_mir_plan)?;
579
580 Ok(Either::Right(global_lir_plan))
581 }
582 }
583 };
584
585 let pipeline_result = pipeline();
586 let optimization_finished_at = now();
587
588 let stage = match pipeline_result {
589 Ok(Either::Left(global_lir_plan)) => {
590 let optimizer = optimizer.unwrap_left();
591 let needs_plan_insights = explain_ctx.needs_plan_insights();
593 let opt_limit =
598 PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
599 .get(catalog.system_config().dyncfgs());
600 let target_instance =
601 catalog.get_cluster(optimizer.cluster_id()).name.clone();
602 let enable_re_optimize =
603 !(matches!(explain_ctx, ExplainContext::PlanInsightsNotice(_))
604 && optimizer.duration() > opt_limit);
605 let insights_ctx = needs_plan_insights
606 .then(|| PlanInsightsContext {
607 stmt: plan
608 .select
609 .as_deref()
610 .map(Clone::clone)
611 .map(Statement::Select),
612 raw_expr: plan.source.clone(),
613 catalog,
614 compute_instances,
615 target_instance,
616 metrics: optimizer.metrics().clone(),
617 finishing: optimizer.finishing().clone(),
618 optimizer_config: optimizer.config().clone(),
619 session,
620 timestamp_context,
621 view_id: optimizer.select_id(),
622 index_id: optimizer.index_id(),
623 enable_re_optimize,
624 })
625 .map(Box::new);
626 match explain_ctx {
627 ExplainContext::Plan(explain_ctx) => {
628 let (_, df_meta, _) = global_lir_plan.unapply();
629 PeekStage::ExplainPlan(PeekStageExplainPlan {
630 validity,
631 optimizer,
632 df_meta,
633 explain_ctx,
634 insights_ctx,
635 })
636 }
637 ExplainContext::PlanInsightsNotice(optimizer_trace) => {
638 PeekStage::Finish(PeekStageFinish {
639 validity,
640 plan,
641 max_query_result_size,
642 id_bundle,
643 target_replica,
644 source_ids,
645 determination,
646 cluster_id: optimizer.cluster_id(),
647 finishing: optimizer.finishing().clone(),
648 plan_insights_optimizer_trace: Some(optimizer_trace),
649 global_lir_plan,
650 optimization_finished_at,
651 insights_ctx,
652 })
653 }
654 ExplainContext::None => PeekStage::Finish(PeekStageFinish {
655 validity,
656 plan,
657 max_query_result_size,
658 id_bundle,
659 target_replica,
660 source_ids,
661 determination,
662 cluster_id: optimizer.cluster_id(),
663 finishing: optimizer.finishing().clone(),
664 plan_insights_optimizer_trace: None,
665 global_lir_plan,
666 optimization_finished_at,
667 insights_ctx,
668 }),
669 ExplainContext::Pushdown => {
670 let (plan, _, _) = global_lir_plan.unapply();
671 let imports = match plan {
672 PeekPlan::SlowPath(plan) => plan
673 .desc
674 .source_imports
675 .into_iter()
676 .filter_map(|(id, import)| {
677 import.desc.arguments.operators.map(|mfp| (id, mfp))
678 })
679 .collect(),
680 PeekPlan::FastPath(_) => BTreeMap::default(),
681 };
682 PeekStage::ExplainPushdown(PeekStageExplainPushdown {
683 validity,
684 determination,
685 imports,
686 })
687 }
688 }
689 }
690 Ok(Either::Right(global_lir_plan)) => {
691 let optimizer = optimizer.unwrap_right();
692 PeekStage::CopyToPreflight(PeekStageCopyTo {
693 validity,
694 optimizer,
695 global_lir_plan,
696 optimization_finished_at,
697 source_ids,
698 })
699 }
700 Err(err) => {
703 let Some(optimizer) = optimizer.left() else {
704 return Err(err);
707 };
708 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
709 return Err(err);
712 };
713
714 if explain_ctx.broken {
715 tracing::error!("error while handling EXPLAIN statement: {}", err);
719 PeekStage::ExplainPlan(PeekStageExplainPlan {
720 validity,
721 optimizer,
722 df_meta: Default::default(),
723 explain_ctx,
724 insights_ctx: None,
725 })
726 } else {
727 return Err(err);
730 }
731 }
732 };
733 Ok(Box::new(stage))
734 })
735 },
736 )))
737 }
738
739 #[instrument]
740 async fn peek_real_time_recency(
741 &self,
742 session: &Session,
743 PeekStageRealTimeRecency {
744 validity,
745 plan,
746 max_query_result_size,
747 source_ids,
748 target_replica,
749 timeline_context,
750 oracle_read_ts,
751 optimizer,
752 explain_ctx,
753 }: PeekStageRealTimeRecency,
754 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
755 let fut = self
756 .determine_real_time_recent_timestamp_if_needed(session, source_ids.iter().copied())
757 .await?;
758
759 match fut {
760 Some(fut) => {
761 let span = Span::current();
762 Ok(StageResult::Handle(mz_ore::task::spawn(
763 || "peek real time recency",
764 async move {
765 let real_time_recency_ts = fut.await?;
766 let stage = PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
767 validity,
768 plan,
769 max_query_result_size,
770 target_replica,
771 timeline_context,
772 source_ids,
773 optimizer,
774 explain_ctx,
775 oracle_read_ts,
776 real_time_recency_ts: Some(real_time_recency_ts),
777 });
778 Ok(Box::new(stage))
779 }
780 .instrument(span),
781 )))
782 }
783 None => Ok(StageResult::Immediate(Box::new(
784 PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
785 validity,
786 plan,
787 max_query_result_size,
788 target_replica,
789 timeline_context,
790 source_ids,
791 optimizer,
792 explain_ctx,
793 oracle_read_ts,
794 real_time_recency_ts: None,
795 }),
796 ))),
797 }
798 }
799
800 #[instrument]
801 async fn peek_finish(
802 &mut self,
803 ctx: &mut ExecuteContext,
804 PeekStageFinish {
805 validity: _,
806 plan,
807 max_query_result_size,
808 id_bundle,
809 target_replica,
810 source_ids,
811 determination,
812 cluster_id,
813 finishing,
814 plan_insights_optimizer_trace,
815 global_lir_plan,
816 optimization_finished_at,
817 insights_ctx,
818 }: PeekStageFinish,
819 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
820 if let Some(id) = ctx.extra.contents() {
821 self.record_statement_lifecycle_event(
822 &id,
823 &StatementLifecycleEvent::OptimizationFinished,
824 optimization_finished_at,
825 );
826 }
827
828 let session = ctx.session_mut();
829 let conn_id = session.conn_id().clone();
830
831 let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
832 let source_arity = typ.arity();
833
834 emit_optimizer_notices(&*self.catalog, &*session, &df_meta.optimizer_notices);
835
836 if let Some(trace) = plan_insights_optimizer_trace {
837 let target_cluster = self.catalog().get_cluster(cluster_id);
838 let features = OptimizerFeatures::from(self.catalog().system_config())
839 .override_from(&target_cluster.config.features());
840 let insights = trace
841 .into_plan_insights(
842 &features,
843 &self.catalog().for_session(session),
844 Some(plan.finishing),
845 Some(target_cluster),
846 df_meta,
847 insights_ctx,
848 )
849 .await?;
850 session.add_notice(AdapterNotice::PlanInsights(insights));
851 }
852
853 let planned_peek = PlannedPeek {
854 plan: peek_plan,
855 determination: determination.clone(),
856 conn_id: conn_id.clone(),
857 intermediate_result_type: typ,
858 source_arity,
859 source_ids,
860 };
861
862 if let Some(transient_index_id) = match &planned_peek.plan {
863 peek::PeekPlan::FastPath(_) => None,
864 peek::PeekPlan::SlowPath(PeekDataflowPlan { id, .. }) => Some(id),
865 } {
866 if let Some(statement_logging_id) = ctx.extra.contents() {
867 self.set_transient_index_id(statement_logging_id, *transient_index_id);
868 }
869 }
870
871 if let Some(logging_id) = ctx.extra().contents() {
872 let watch_set = WatchSetCreation::new(
873 logging_id,
874 self.catalog.state(),
875 &id_bundle,
876 determination.timestamp_context.timestamp_or_default(),
877 );
878 self.install_peek_watch_sets(conn_id.clone(), watch_set).expect("the old peek sequencing re-verifies the dependencies' existence before installing the new watch sets");
879 }
880
881 let max_result_size = self.catalog().system_config().max_result_size();
882
883 let resp = self
885 .implement_peek_plan(
886 ctx.extra_mut(),
887 planned_peek,
888 finishing,
889 cluster_id,
890 target_replica,
891 max_result_size,
892 max_query_result_size,
893 )
894 .await?;
895
896 if ctx.session().vars().emit_timestamp_notice() {
897 let explanation = self.explain_timestamp(
898 ctx.session().conn_id(),
899 ctx.session().pcx().wall_time,
900 cluster_id,
901 &id_bundle,
902 determination,
903 );
904 ctx.session()
905 .add_notice(AdapterNotice::QueryTimestamp { explanation });
906 }
907
908 let resp = match plan.copy_to {
909 None => resp,
910 Some(format) => ExecuteResponse::CopyTo {
911 format,
912 resp: Box::new(resp),
913 },
914 };
915 Ok(StageResult::Response(resp))
916 }
917
918 #[instrument]
919 async fn peek_copy_to_preflight(
920 &self,
921 copy_to: PeekStageCopyTo,
922 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
923 let connection_context = self.connection_context().clone();
924 Ok(StageResult::Handle(mz_ore::task::spawn(
925 || "peek copy to preflight",
926 async {
927 let sinks = ©_to.global_lir_plan.df_desc().sink_exports;
928 if sinks.len() != 1 {
929 return Err(AdapterError::Internal(
930 "expected exactly one copy to s3 sink".into(),
931 ));
932 }
933 let (sink_id, sink_desc) = sinks
934 .first_key_value()
935 .expect("known to be exactly one copy to s3 sink");
936 match &sink_desc.connection {
937 ComputeSinkConnection::CopyToS3Oneshot(conn) => {
938 mz_storage_types::sinks::s3_oneshot_sink::preflight(
939 connection_context,
940 &conn.aws_connection,
941 &conn.upload_info,
942 conn.connection_id,
943 *sink_id,
944 )
945 .await?;
946 Ok(Box::new(PeekStage::CopyToDataflow(copy_to)))
947 }
948 _ => Err(AdapterError::Internal(
949 "expected copy to s3 oneshot sink".into(),
950 )),
951 }
952 },
953 )))
954 }
955
956 #[instrument]
957 async fn peek_copy_to_dataflow(
958 &mut self,
959 ctx: &ExecuteContext,
960 PeekStageCopyTo {
961 validity: _,
962 optimizer,
963 global_lir_plan,
964 optimization_finished_at,
965 source_ids,
966 }: PeekStageCopyTo,
967 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
968 if let Some(id) = ctx.extra.contents() {
969 self.record_statement_lifecycle_event(
970 &id,
971 &StatementLifecycleEvent::OptimizationFinished,
972 optimization_finished_at,
973 );
974 }
975
976 let sink_id = global_lir_plan.sink_id();
977 let cluster_id = optimizer.cluster_id();
978
979 let (df_desc, df_meta) = global_lir_plan.unapply();
980
981 emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
982
983 let (tx, rx) = oneshot::channel();
985 let active_copy_to = ActiveCopyTo {
986 conn_id: ctx.session().conn_id().clone(),
987 tx,
988 cluster_id,
989 depends_on: source_ids,
990 };
991 drop(
993 self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
994 .await,
995 );
996
997 self.ship_dataflow(df_desc, cluster_id, None).await;
999
1000 let span = Span::current();
1001 Ok(StageResult::HandleRetire(mz_ore::task::spawn(
1002 || "peek copy to dataflow",
1003 async {
1004 let res = rx.await;
1005 match res {
1006 Ok(res) => res,
1007 Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
1008 }
1009 }
1010 .instrument(span),
1011 )))
1012 }
1013
1014 #[instrument]
1015 async fn peek_explain_plan(
1016 &self,
1017 session: &Session,
1018 PeekStageExplainPlan {
1019 optimizer,
1020 insights_ctx,
1021 df_meta,
1022 explain_ctx,
1023 ..
1024 }: PeekStageExplainPlan,
1025 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1026 let rows = super::super::explain_plan_inner(
1027 session,
1028 self.catalog(),
1029 df_meta,
1030 explain_ctx,
1031 optimizer,
1032 insights_ctx,
1033 )
1034 .await?;
1035
1036 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
1037 }
1038
1039 #[instrument]
1040 async fn peek_explain_pushdown(
1041 &self,
1042 session: &Session,
1043 stage: PeekStageExplainPushdown,
1044 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1045 let as_of = stage.determination.timestamp_context.antichain();
1046 let mz_now = stage
1047 .determination
1048 .timestamp_context
1049 .timestamp()
1050 .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1051 .unwrap_or_else(ResultSpec::value_all);
1052 let fut = self
1053 .explain_pushdown_future(session, as_of, mz_now, stage.imports)
1054 .await;
1055 let span = Span::current();
1056 Ok(StageResult::HandleRetire(mz_ore::task::spawn(
1057 || "peek explain pushdown",
1058 fut.instrument(span),
1059 )))
1060 }
1061
1062 #[instrument]
1065 pub(super) fn sequence_peek_timestamp(
1066 &mut self,
1067 session: &mut Session,
1068 when: &QueryWhen,
1069 cluster_id: ClusterId,
1070 timeline_context: TimelineContext,
1071 oracle_read_ts: Option<Timestamp>,
1072 source_bundle: &CollectionIdBundle,
1073 source_ids: &BTreeSet<GlobalId>,
1074 real_time_recency_ts: Option<Timestamp>,
1075 requires_linearization: RequireLinearization,
1076 ) -> Result<TimestampDetermination<Timestamp>, AdapterError> {
1077 let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when);
1078 let timedomain_bundle;
1079
1080 let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
1083 Some(
1085 determination @ TimestampDetermination {
1086 timestamp_context: TimestampContext::TimelineTimestamp { .. },
1087 ..
1088 },
1089 ) if in_immediate_multi_stmt_txn => (determination, None),
1090 _ => {
1091 let determine_bundle = if in_immediate_multi_stmt_txn {
1092 timedomain_bundle = timedomain_for(
1095 self.catalog(),
1096 &self.index_oracle(cluster_id),
1097 source_ids,
1098 &timeline_context,
1099 session.conn_id(),
1100 cluster_id,
1101 )?;
1102
1103 &timedomain_bundle
1104 } else {
1105 source_bundle
1107 };
1108 let (determination, read_holds) = self.determine_timestamp(
1109 session,
1110 determine_bundle,
1111 when,
1112 cluster_id,
1113 &timeline_context,
1114 oracle_read_ts,
1115 real_time_recency_ts,
1116 )?;
1117 let read_holds = match determination.timestamp_context.timestamp() {
1119 Some(_ts) => Some(read_holds),
1120 None => {
1121 drop(read_holds);
1126 None
1127 }
1128 };
1129 (determination, read_holds)
1130 }
1131 };
1132
1133 if let Some(txn_reads) = self.txn_read_holds.get(session.conn_id()) {
1142 let allowed_id_bundle = txn_reads.id_bundle();
1146
1147 drop(read_holds);
1150
1151 let outside = source_bundle.difference(&allowed_id_bundle);
1152 if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
1154 let valid_names =
1155 allowed_id_bundle.resolve_names(self.catalog(), session.conn_id());
1156 let invalid_names = outside.resolve_names(self.catalog(), session.conn_id());
1157 return Err(AdapterError::RelationOutsideTimeDomain {
1158 relations: invalid_names,
1159 names: valid_names,
1160 });
1161 }
1162 } else if let Some(read_holds) = read_holds {
1163 self.store_transaction_read_holds(session.conn_id().clone(), read_holds);
1164 }
1165
1166 let mut transaction_determination = determination.clone();
1177 if when.is_transactional() {
1178 session.add_transaction_ops(TransactionOps::Peeks {
1179 determination: transaction_determination,
1180 cluster_id,
1181 requires_linearization,
1182 })?;
1183 } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
1184 transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
1186 session.add_transaction_ops(TransactionOps::Peeks {
1187 determination: transaction_determination,
1188 cluster_id,
1189 requires_linearization,
1190 })?;
1191 };
1192
1193 Ok(determination)
1194 }
1195}