1use std::collections::{BTreeMap, BTreeSet};
11use std::sync::Arc;
12
13use itertools::Either;
14use mz_adapter_types::dyncfgs::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION;
15use mz_compute_types::sinks::ComputeSinkConnection;
16use mz_controller_types::ClusterId;
17use mz_expr::{CollectionPlan, ResultSpec};
18use mz_ore::cast::CastFrom;
19use mz_ore::instrument;
20use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
21use mz_repr::{Datum, GlobalId, Timestamp};
22use mz_sql::ast::{ExplainStage, Statement};
23use mz_sql::catalog::CatalogCluster;
24use mz_sql::plan::QueryWhen;
26use mz_sql::plan::{self};
27use mz_sql::session::metadata::SessionMetadata;
28use mz_transform::EmptyStatisticsOracle;
29use tokio::sync::oneshot;
30use tracing::warn;
31use tracing::{Instrument, Span};
32
33use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
34use crate::command::ExecuteResponse;
35use crate::coord::id_bundle::CollectionIdBundle;
36use crate::coord::peek::{self, PeekDataflowPlan, PeekPlan, PlannedPeek};
37use crate::coord::sequencer::inner::return_if_err;
38use crate::coord::sequencer::{check_log_reads, emit_optimizer_notices, eval_copy_to_uri};
39use crate::coord::timeline::{TimelineContext, timedomain_for};
40use crate::coord::timestamp_selection::{
41 TimestampContext, TimestampDetermination, TimestampProvider,
42};
43use crate::coord::{
44 Coordinator, CopyToContext, ExecuteContext, ExplainContext, ExplainPlanContext, Message,
45 PeekStage, PeekStageCopyTo, PeekStageExplainPlan, PeekStageExplainPushdown, PeekStageFinish,
46 PeekStageLinearizeTimestamp, PeekStageOptimize, PeekStageRealTimeRecency,
47 PeekStageTimestampReadHold, PlanValidity, StageResult, Staged, TargetCluster,
48};
49use crate::error::AdapterError;
50use crate::explain::insights::PlanInsightsContext;
51use crate::explain::optimizer_trace::OptimizerTrace;
52use crate::notice::AdapterNotice;
53use crate::optimize::{self, Optimize};
54use crate::session::{RequireLinearization, Session, TransactionOps, TransactionStatus};
55use crate::statement_logging::StatementLifecycleEvent;
56use crate::statement_logging::WatchSetCreation;
57
58impl Staged for PeekStage {
59 type Ctx = ExecuteContext;
60
61 fn validity(&mut self) -> &mut PlanValidity {
62 match self {
63 PeekStage::LinearizeTimestamp(stage) => &mut stage.validity,
64 PeekStage::RealTimeRecency(stage) => &mut stage.validity,
65 PeekStage::TimestampReadHold(stage) => &mut stage.validity,
66 PeekStage::Optimize(stage) => &mut stage.validity,
67 PeekStage::Finish(stage) => &mut stage.validity,
68 PeekStage::ExplainPlan(stage) => &mut stage.validity,
69 PeekStage::ExplainPushdown(stage) => &mut stage.validity,
70 PeekStage::CopyToPreflight(stage) => &mut stage.validity,
71 PeekStage::CopyToDataflow(stage) => &mut stage.validity,
72 }
73 }
74
75 async fn stage(
76 self,
77 coord: &mut Coordinator,
78 ctx: &mut ExecuteContext,
79 ) -> Result<StageResult<Box<Self>>, AdapterError> {
80 match self {
81 PeekStage::LinearizeTimestamp(stage) => {
82 coord.peek_linearize_timestamp(ctx.session(), stage).await
83 }
84 PeekStage::RealTimeRecency(stage) => {
85 coord.peek_real_time_recency(ctx.session(), stage).await
86 }
87 PeekStage::TimestampReadHold(stage) => {
88 coord.peek_timestamp_read_hold(ctx.session_mut(), stage)
89 }
90 PeekStage::Optimize(stage) => coord.peek_optimize(ctx.session(), stage).await,
91 PeekStage::Finish(stage) => coord.peek_finish(ctx, stage).await,
92 PeekStage::ExplainPlan(stage) => coord.peek_explain_plan(ctx.session(), stage).await,
93 PeekStage::ExplainPushdown(stage) => {
94 coord.peek_explain_pushdown(ctx.session(), stage).await
95 }
96 PeekStage::CopyToPreflight(stage) => coord.peek_copy_to_preflight(stage).await,
97 PeekStage::CopyToDataflow(stage) => coord.peek_copy_to_dataflow(ctx, stage).await,
98 }
99 }
100
101 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
102 Message::PeekStageReady {
103 ctx,
104 span,
105 stage: self,
106 }
107 }
108
109 fn cancel_enabled(&self) -> bool {
110 true
111 }
112}
113
114impl Coordinator {
115 #[instrument]
122 pub(crate) async fn sequence_peek(
123 &mut self,
124 ctx: ExecuteContext,
125 plan: plan::SelectPlan,
126 target_cluster: TargetCluster,
127 max_query_result_size: Option<u64>,
128 ) {
129 let explain_ctx = if ctx.session().vars().emit_plan_insights_notice() {
130 let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
131 ExplainContext::PlanInsightsNotice(optimizer_trace)
132 } else {
133 ExplainContext::None
134 };
135
136 let stage = return_if_err!(
137 self.peek_validate(
138 ctx.session(),
139 plan,
140 target_cluster,
141 None,
142 explain_ctx,
143 max_query_result_size
144 ),
145 ctx
146 );
147 self.sequence_staged(ctx, Span::current(), stage).await;
148 }
149
150 #[instrument]
151 pub(crate) async fn sequence_copy_to(
152 &mut self,
153 ctx: ExecuteContext,
154 plan::CopyToPlan {
155 select_plan,
156 desc,
157 to,
158 connection,
159 connection_id,
160 format,
161 max_file_size,
162 }: plan::CopyToPlan,
163 target_cluster: TargetCluster,
164 ) {
165 let uri = return_if_err!(
166 eval_copy_to_uri(to, ctx.session(), self.catalog().state()),
167 ctx
168 );
169
170 let stage = return_if_err!(
171 self.peek_validate(
172 ctx.session(),
173 select_plan,
174 target_cluster,
175 Some(CopyToContext {
176 desc,
177 uri,
178 connection,
179 connection_id,
180 format,
181 max_file_size,
182 output_batch_count: None,
184 }),
185 ExplainContext::None,
186 Some(ctx.session().vars().max_query_result_size()),
187 ),
188 ctx
189 );
190 self.sequence_staged(ctx, Span::current(), stage).await;
191 }
192
193 #[instrument]
194 pub(crate) async fn explain_peek(
195 &mut self,
196 ctx: ExecuteContext,
197 plan::ExplainPlanPlan {
198 stage,
199 format,
200 config,
201 explainee,
202 }: plan::ExplainPlanPlan,
203 target_cluster: TargetCluster,
204 ) {
205 let plan::Explainee::Statement(stmt) = explainee else {
206 unreachable!()
209 };
210 let plan::ExplaineeStatement::Select { broken, plan, desc } = stmt else {
211 unreachable!()
214 };
215
216 let optimizer_trace = OptimizerTrace::new(stage.paths());
219
220 let stage = return_if_err!(
221 self.peek_validate(
222 ctx.session(),
223 plan,
224 target_cluster,
225 None,
226 ExplainContext::Plan(ExplainPlanContext {
227 broken,
228 config,
229 format,
230 stage,
231 replan: None,
232 desc: Some(desc),
233 optimizer_trace,
234 }),
235 Some(ctx.session().vars().max_query_result_size()),
236 ),
237 ctx
238 );
239 self.sequence_staged(ctx, Span::current(), stage).await;
240 }
241
242 #[instrument]
244 pub fn peek_validate(
245 &self,
246 session: &Session,
247 plan: mz_sql::plan::SelectPlan,
248 target_cluster: TargetCluster,
249 copy_to_ctx: Option<CopyToContext>,
250 explain_ctx: ExplainContext,
251 max_query_result_size: Option<u64>,
252 ) -> Result<PeekStage, AdapterError> {
253 let catalog = self.owned_catalog();
255 let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
256 let compute_instance = self
257 .instance_snapshot(cluster.id())
258 .expect("compute instance does not exist");
259 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
260 .override_from(&self.catalog.get_cluster(cluster.id()).config.features())
261 .override_from(&explain_ctx);
262
263 if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
264 return Err(AdapterError::NoClusterReplicasAvailable {
265 name: cluster.name.clone(),
266 is_managed: cluster.is_managed(),
267 });
268 }
269
270 let optimizer = match copy_to_ctx {
271 None => {
272 let (_, view_id) = self.allocate_transient_id();
274 let (_, index_id) = self.allocate_transient_id();
275
276 Either::Left(optimize::peek::Optimizer::new(
278 Arc::clone(&catalog),
279 compute_instance,
280 plan.finishing.clone(),
281 view_id,
282 index_id,
283 optimizer_config,
284 self.optimizer_metrics(),
285 ))
286 }
287 Some(mut copy_to_ctx) => {
288 let worker_counts = cluster.replicas().map(|r| {
292 let loc = &r.config.location;
293 loc.workers().unwrap_or_else(|| loc.num_processes())
294 });
295 let max_worker_count = match worker_counts.max() {
296 Some(count) => u64::cast_from(count),
297 None => {
298 return Err(AdapterError::NoClusterReplicasAvailable {
299 name: cluster.name.clone(),
300 is_managed: cluster.is_managed(),
301 });
302 }
303 };
304 copy_to_ctx.output_batch_count = Some(max_worker_count);
305 let (_, view_id) = self.allocate_transient_id();
306 Either::Right(optimize::copy_to::Optimizer::new(
308 Arc::clone(&catalog),
309 compute_instance,
310 view_id,
311 copy_to_ctx,
312 optimizer_config,
313 self.optimizer_metrics(),
314 ))
315 }
316 };
317
318 let target_replica_name = session.vars().cluster_replica();
319 let mut target_replica = target_replica_name
320 .map(|name| {
321 cluster
322 .replica_id(name)
323 .ok_or(AdapterError::UnknownClusterReplica {
324 cluster_name: cluster.name.clone(),
325 replica_name: name.to_string(),
326 })
327 })
328 .transpose()?;
329
330 let source_ids = plan.source.depends_on();
331 let mut timeline_context = self
332 .catalog()
333 .validate_timeline_context(source_ids.iter().copied())?;
334 if matches!(timeline_context, TimelineContext::TimestampIndependent)
335 && plan.source.contains_temporal()?
336 {
337 timeline_context = TimelineContext::TimestampDependent;
341 }
342
343 let notices = check_log_reads(
344 &catalog,
345 cluster,
346 &source_ids,
347 &mut target_replica,
348 session.vars(),
349 )?;
350 session.add_notices(notices);
351
352 let dependencies = source_ids
353 .iter()
354 .map(|id| self.catalog.resolve_item_id(id))
355 .collect();
356 let validity = PlanValidity::new(
357 catalog.transient_revision(),
358 dependencies,
359 Some(cluster.id()),
360 target_replica,
361 session.role_metadata().clone(),
362 );
363
364 Ok(PeekStage::LinearizeTimestamp(PeekStageLinearizeTimestamp {
365 validity,
366 plan,
367 max_query_result_size,
368 source_ids,
369 target_replica,
370 timeline_context,
371 optimizer,
372 explain_ctx,
373 }))
374 }
375
376 #[instrument]
378 async fn peek_linearize_timestamp(
379 &self,
380 session: &Session,
381 PeekStageLinearizeTimestamp {
382 validity,
383 source_ids,
384 plan,
385 max_query_result_size,
386 target_replica,
387 timeline_context,
388 optimizer,
389 explain_ctx,
390 }: PeekStageLinearizeTimestamp,
391 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
392 let isolation_level = session.vars().transaction_isolation().clone();
393 let timeline = Coordinator::get_timeline(&timeline_context);
394 let needs_linearized_read_ts =
395 Coordinator::needs_linearized_read_ts(&isolation_level, &plan.when);
396
397 let build_stage = move |oracle_read_ts: Option<Timestamp>| PeekStageRealTimeRecency {
398 validity,
399 plan,
400 max_query_result_size,
401 source_ids,
402 target_replica,
403 timeline_context,
404 oracle_read_ts,
405 optimizer,
406 explain_ctx,
407 };
408
409 match timeline {
410 Some(timeline) if needs_linearized_read_ts => {
411 let oracle = self.get_timestamp_oracle(&timeline);
412
413 let span = Span::current();
417 Ok(StageResult::Handle(mz_ore::task::spawn(
418 || "linearize timestamp",
419 async move {
420 let oracle_read_ts = oracle.read_ts().await;
421 let stage = build_stage(Some(oracle_read_ts));
422 let stage = PeekStage::RealTimeRecency(stage);
423 Ok(Box::new(stage))
424 }
425 .instrument(span),
426 )))
427 }
428 Some(_) | None => {
429 let stage = build_stage(None);
430 let stage = PeekStage::RealTimeRecency(stage);
431 Ok(StageResult::Immediate(Box::new(stage)))
432 }
433 }
434 }
435
436 #[instrument]
438 fn peek_timestamp_read_hold(
439 &mut self,
440 session: &mut Session,
441 PeekStageTimestampReadHold {
442 mut validity,
443 plan,
444 max_query_result_size,
445 source_ids,
446 target_replica,
447 timeline_context,
448 oracle_read_ts,
449 real_time_recency_ts,
450 optimizer,
451 explain_ctx,
452 }: PeekStageTimestampReadHold,
453 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
454 let cluster_id = match optimizer.as_ref() {
455 Either::Left(optimizer) => optimizer.cluster_id(),
456 Either::Right(optimizer) => optimizer.cluster_id(),
457 };
458 let id_bundle = self
459 .dataflow_builder(cluster_id)
460 .sufficient_collections(source_ids.iter().copied());
461
462 let item_ids = id_bundle
465 .iter()
466 .map(|id| self.catalog().resolve_item_id(&id));
467 validity.extend_dependencies(item_ids);
468
469 let determination = self.sequence_peek_timestamp(
470 session,
471 &plan.when,
472 cluster_id,
473 timeline_context,
474 oracle_read_ts,
475 &id_bundle,
476 &source_ids,
477 real_time_recency_ts,
478 (&explain_ctx).into(),
479 )?;
480
481 let stage = PeekStage::Optimize(PeekStageOptimize {
482 validity,
483 plan,
484 max_query_result_size,
485 source_ids,
486 id_bundle,
487 target_replica,
488 determination,
489 optimizer,
490 explain_ctx,
491 });
492 Ok(StageResult::Immediate(Box::new(stage)))
493 }
494
495 #[instrument]
496 async fn peek_optimize(
497 &self,
498 session: &Session,
499 PeekStageOptimize {
500 validity,
501 plan,
502 max_query_result_size,
503 source_ids,
504 id_bundle,
505 target_replica,
506 determination,
507 mut optimizer,
508 explain_ctx,
509 }: PeekStageOptimize,
510 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
511 let timestamp_context = determination.timestamp_context.clone();
514 let stats = self
515 .statistics_oracle(session, &source_ids, ×tamp_context.antichain(), true)
516 .await
517 .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
518 let session = session.meta();
519 let now = self.catalog().config().now.clone();
520 let catalog = self.owned_catalog();
521 let mut compute_instances = BTreeMap::new();
522 if explain_ctx.needs_plan_insights() {
523 for cluster in self.catalog().user_clusters() {
526 let snapshot = self.instance_snapshot(cluster.id).expect("must exist");
527 compute_instances.insert(cluster.name.clone(), snapshot);
528 }
529 }
530
531 let span = Span::current();
532 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
533 || "optimize peek",
534 move || {
535 span.in_scope(|| {
536 let pipeline = || -> Result<Either<optimize::peek::GlobalLirPlan, optimize::copy_to::GlobalLirPlan>, AdapterError> {
537 let _dispatch_guard = explain_ctx.dispatch_guard();
538
539 let raw_expr = plan.source.clone();
540
541 match optimizer.as_mut() {
542 Either::Left(optimizer) => {
544 let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
546 let local_mir_plan = local_mir_plan.resolve(timestamp_context.clone(), &session, stats);
548 let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
550
551 Ok(Either::Left(global_lir_plan))
552 }
553 Either::Right(optimizer) => {
555 let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
557 let local_mir_plan = local_mir_plan.resolve(timestamp_context.clone(), &session, stats);
559 let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
561
562 Ok(Either::Right(global_lir_plan))
563 }
564 }
565 };
566
567 let pipeline_result = pipeline();
568 let optimization_finished_at = now();
569
570 let stage = match pipeline_result {
571 Ok(Either::Left(global_lir_plan)) => {
572 let optimizer = optimizer.unwrap_left();
573 let needs_plan_insights = explain_ctx.needs_plan_insights();
575 let opt_limit = PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
580 .get(catalog.system_config().dyncfgs());
581 let target_instance = catalog
582 .get_cluster(optimizer.cluster_id())
583 .name
584 .clone();
585 let enable_re_optimize =
586 !(matches!(explain_ctx, ExplainContext::PlanInsightsNotice(_))
587 && optimizer.duration() > opt_limit);
588 let insights_ctx = needs_plan_insights.then(|| PlanInsightsContext {
589 stmt: plan.select.as_deref().map(Clone::clone).map(Statement::Select),
590 raw_expr: plan.source.clone(),
591 catalog,
592 compute_instances,
593 target_instance,
594 metrics: optimizer.metrics().clone(),
595 finishing: optimizer.finishing().clone(),
596 optimizer_config: optimizer.config().clone(),
597 session,
598 timestamp_context,
599 view_id: optimizer.select_id(),
600 index_id: optimizer.index_id(),
601 enable_re_optimize,
602 }).map(Box::new);
603 match explain_ctx {
604 ExplainContext::Plan(explain_ctx) => {
605 let (_, df_meta, _) = global_lir_plan.unapply();
606 PeekStage::ExplainPlan(PeekStageExplainPlan {
607 validity,
608 optimizer,
609 df_meta,
610 explain_ctx,
611 insights_ctx,
612 })
613 }
614 ExplainContext::PlanInsightsNotice(optimizer_trace) => {
615 PeekStage::Finish(PeekStageFinish {
616 validity,
617 plan,
618 max_query_result_size,
619 id_bundle,
620 target_replica,
621 source_ids,
622 determination,
623 cluster_id: optimizer.cluster_id(),
624 finishing: optimizer.finishing().clone(),
625 plan_insights_optimizer_trace: Some(optimizer_trace),
626 global_lir_plan,
627 optimization_finished_at,
628 insights_ctx,
629 })
630 }
631 ExplainContext::None => PeekStage::Finish(PeekStageFinish {
632 validity,
633 plan,
634 max_query_result_size,
635 id_bundle,
636 target_replica,
637 source_ids,
638 determination,
639 cluster_id: optimizer.cluster_id(),
640 finishing: optimizer.finishing().clone(),
641 plan_insights_optimizer_trace: None,
642 global_lir_plan,
643 optimization_finished_at,
644 insights_ctx,
645 }),
646 ExplainContext::Pushdown => {
647 let (plan, _, _) = global_lir_plan.unapply();
648 let imports = match plan {
649 PeekPlan::SlowPath(plan) => plan
650 .desc
651 .source_imports
652 .into_iter()
653 .filter_map(|(id, (desc, _, _upper))| {
654 desc.arguments.operators.map(|mfp| (id, mfp))
655 })
656 .collect(),
657 PeekPlan::FastPath(_) => BTreeMap::default(),
658 };
659 PeekStage::ExplainPushdown(PeekStageExplainPushdown {
660 validity,
661 determination,
662 imports,
663 })
664 }
665 }
666 }
667 Ok(Either::Right(global_lir_plan)) => {
668 let optimizer = optimizer.unwrap_right();
669 PeekStage::CopyToPreflight(PeekStageCopyTo {
670 validity,
671 optimizer,
672 global_lir_plan,
673 optimization_finished_at,
674 source_ids,
675 })
676 }
677 Err(err) => {
680 let Some(optimizer) = optimizer.left() else {
681 return Err(err);
684 };
685 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
686 return Err(err);
689 };
690
691 if explain_ctx.broken {
692 tracing::error!("error while handling EXPLAIN statement: {}", err);
696 PeekStage::ExplainPlan(PeekStageExplainPlan {
697 validity,
698 optimizer,
699 df_meta: Default::default(),
700 explain_ctx,
701 insights_ctx: None,
702 })
703 } else {
704 return Err(err);
707 }
708 }
709 };
710 Ok(Box::new(stage))
711 })
712 },
713 )))
714 }
715
716 #[instrument]
717 async fn peek_real_time_recency(
718 &self,
719 session: &Session,
720 PeekStageRealTimeRecency {
721 validity,
722 plan,
723 max_query_result_size,
724 source_ids,
725 target_replica,
726 timeline_context,
727 oracle_read_ts,
728 optimizer,
729 explain_ctx,
730 }: PeekStageRealTimeRecency,
731 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
732 let fut = self
733 .determine_real_time_recent_timestamp_if_needed(session, source_ids.iter().copied())
734 .await?;
735
736 match fut {
737 Some(fut) => {
738 let span = Span::current();
739 Ok(StageResult::Handle(mz_ore::task::spawn(
740 || "peek real time recency",
741 async move {
742 let real_time_recency_ts = fut.await?;
743 let stage = PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
744 validity,
745 plan,
746 max_query_result_size,
747 target_replica,
748 timeline_context,
749 source_ids,
750 optimizer,
751 explain_ctx,
752 oracle_read_ts,
753 real_time_recency_ts: Some(real_time_recency_ts),
754 });
755 Ok(Box::new(stage))
756 }
757 .instrument(span),
758 )))
759 }
760 None => Ok(StageResult::Immediate(Box::new(
761 PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
762 validity,
763 plan,
764 max_query_result_size,
765 target_replica,
766 timeline_context,
767 source_ids,
768 optimizer,
769 explain_ctx,
770 oracle_read_ts,
771 real_time_recency_ts: None,
772 }),
773 ))),
774 }
775 }
776
777 #[instrument]
778 async fn peek_finish(
779 &mut self,
780 ctx: &mut ExecuteContext,
781 PeekStageFinish {
782 validity: _,
783 plan,
784 max_query_result_size,
785 id_bundle,
786 target_replica,
787 source_ids,
788 determination,
789 cluster_id,
790 finishing,
791 plan_insights_optimizer_trace,
792 global_lir_plan,
793 optimization_finished_at,
794 insights_ctx,
795 }: PeekStageFinish,
796 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
797 if let Some(id) = ctx.extra.contents() {
798 self.record_statement_lifecycle_event(
799 &id,
800 &StatementLifecycleEvent::OptimizationFinished,
801 optimization_finished_at,
802 );
803 }
804
805 let session = ctx.session_mut();
806 let conn_id = session.conn_id().clone();
807
808 let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
809 let source_arity = typ.arity();
810
811 emit_optimizer_notices(&*self.catalog, &*session, &df_meta.optimizer_notices);
812
813 if let Some(trace) = plan_insights_optimizer_trace {
814 let target_cluster = self.catalog().get_cluster(cluster_id);
815 let features = OptimizerFeatures::from(self.catalog().system_config())
816 .override_from(&target_cluster.config.features());
817 let insights = trace
818 .into_plan_insights(
819 &features,
820 &self.catalog().for_session(session),
821 Some(plan.finishing),
822 Some(target_cluster),
823 df_meta,
824 insights_ctx,
825 )
826 .await?;
827 session.add_notice(AdapterNotice::PlanInsights(insights));
828 }
829
830 let planned_peek = PlannedPeek {
831 plan: peek_plan,
832 determination: determination.clone(),
833 conn_id: conn_id.clone(),
834 intermediate_result_type: typ,
835 source_arity,
836 source_ids,
837 };
838
839 if let Some(transient_index_id) = match &planned_peek.plan {
840 peek::PeekPlan::FastPath(_) => None,
841 peek::PeekPlan::SlowPath(PeekDataflowPlan { id, .. }) => Some(id),
842 } {
843 if let Some(statement_logging_id) = ctx.extra.contents() {
844 self.set_transient_index_id(statement_logging_id, *transient_index_id);
845 }
846 }
847
848 if let Some(logging_id) = ctx.extra().contents() {
849 let watch_set = WatchSetCreation::new(
850 logging_id,
851 self.catalog.state(),
852 &id_bundle,
853 determination.timestamp_context.timestamp_or_default(),
854 );
855 self.install_peek_watch_sets(conn_id.clone(), watch_set).expect("the old peek sequencing re-verifies the dependencies' existence before installing the new watch sets");
856 }
857
858 let max_result_size = self.catalog().system_config().max_result_size();
859
860 let resp = self
862 .implement_peek_plan(
863 ctx.extra_mut(),
864 planned_peek,
865 finishing,
866 cluster_id,
867 target_replica,
868 max_result_size,
869 max_query_result_size,
870 )
871 .await?;
872
873 if ctx.session().vars().emit_timestamp_notice() {
874 let explanation = self.explain_timestamp(
875 ctx.session().conn_id(),
876 ctx.session().pcx().wall_time,
877 cluster_id,
878 &id_bundle,
879 determination,
880 );
881 ctx.session()
882 .add_notice(AdapterNotice::QueryTimestamp { explanation });
883 }
884
885 let resp = match plan.copy_to {
886 None => resp,
887 Some(format) => ExecuteResponse::CopyTo {
888 format,
889 resp: Box::new(resp),
890 },
891 };
892 Ok(StageResult::Response(resp))
893 }
894
895 #[instrument]
896 async fn peek_copy_to_preflight(
897 &self,
898 copy_to: PeekStageCopyTo,
899 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
900 let connection_context = self.connection_context().clone();
901 Ok(StageResult::Handle(mz_ore::task::spawn(
902 || "peek copy to preflight",
903 async {
904 let sinks = ©_to.global_lir_plan.df_desc().sink_exports;
905 if sinks.len() != 1 {
906 return Err(AdapterError::Internal(
907 "expected exactly one copy to s3 sink".into(),
908 ));
909 }
910 let (sink_id, sink_desc) = sinks
911 .first_key_value()
912 .expect("known to be exactly one copy to s3 sink");
913 match &sink_desc.connection {
914 ComputeSinkConnection::CopyToS3Oneshot(conn) => {
915 mz_storage_types::sinks::s3_oneshot_sink::preflight(
916 connection_context,
917 &conn.aws_connection,
918 &conn.upload_info,
919 conn.connection_id,
920 *sink_id,
921 )
922 .await?;
923 Ok(Box::new(PeekStage::CopyToDataflow(copy_to)))
924 }
925 _ => Err(AdapterError::Internal(
926 "expected copy to s3 oneshot sink".into(),
927 )),
928 }
929 },
930 )))
931 }
932
933 #[instrument]
934 async fn peek_copy_to_dataflow(
935 &mut self,
936 ctx: &ExecuteContext,
937 PeekStageCopyTo {
938 validity: _,
939 optimizer,
940 global_lir_plan,
941 optimization_finished_at,
942 source_ids,
943 }: PeekStageCopyTo,
944 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
945 if let Some(id) = ctx.extra.contents() {
946 self.record_statement_lifecycle_event(
947 &id,
948 &StatementLifecycleEvent::OptimizationFinished,
949 optimization_finished_at,
950 );
951 }
952
953 let sink_id = global_lir_plan.sink_id();
954 let cluster_id = optimizer.cluster_id();
955
956 let (df_desc, df_meta) = global_lir_plan.unapply();
957
958 emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
959
960 let (tx, rx) = oneshot::channel();
962 let active_copy_to = ActiveCopyTo {
963 conn_id: ctx.session().conn_id().clone(),
964 tx,
965 cluster_id,
966 depends_on: source_ids,
967 };
968 drop(
970 self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
971 .await,
972 );
973
974 self.ship_dataflow(df_desc, cluster_id, None).await;
976
977 let span = Span::current();
978 Ok(StageResult::HandleRetire(mz_ore::task::spawn(
979 || "peek copy to dataflow",
980 async {
981 let res = rx.await;
982 match res {
983 Ok(res) => res,
984 Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
985 }
986 }
987 .instrument(span),
988 )))
989 }
990
991 #[instrument]
992 async fn peek_explain_plan(
993 &self,
994 session: &Session,
995 PeekStageExplainPlan {
996 optimizer,
997 insights_ctx,
998 df_meta,
999 explain_ctx,
1000 ..
1001 }: PeekStageExplainPlan,
1002 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1003 let rows = super::super::explain_plan_inner(
1004 session,
1005 self.catalog(),
1006 df_meta,
1007 explain_ctx,
1008 optimizer,
1009 insights_ctx,
1010 )
1011 .await?;
1012
1013 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
1014 }
1015
1016 #[instrument]
1017 async fn peek_explain_pushdown(
1018 &self,
1019 session: &Session,
1020 stage: PeekStageExplainPushdown,
1021 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1022 let as_of = stage.determination.timestamp_context.antichain();
1023 let mz_now = stage
1024 .determination
1025 .timestamp_context
1026 .timestamp()
1027 .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1028 .unwrap_or_else(ResultSpec::value_all);
1029 let fut = self
1030 .explain_pushdown_future(session, as_of, mz_now, stage.imports)
1031 .await;
1032 let span = Span::current();
1033 Ok(StageResult::HandleRetire(mz_ore::task::spawn(
1034 || "peek explain pushdown",
1035 fut.instrument(span),
1036 )))
1037 }
1038
1039 #[instrument]
1042 pub(super) fn sequence_peek_timestamp(
1043 &mut self,
1044 session: &mut Session,
1045 when: &QueryWhen,
1046 cluster_id: ClusterId,
1047 timeline_context: TimelineContext,
1048 oracle_read_ts: Option<Timestamp>,
1049 source_bundle: &CollectionIdBundle,
1050 source_ids: &BTreeSet<GlobalId>,
1051 real_time_recency_ts: Option<Timestamp>,
1052 requires_linearization: RequireLinearization,
1053 ) -> Result<TimestampDetermination<Timestamp>, AdapterError> {
1054 let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when);
1055 let timedomain_bundle;
1056
1057 let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
1060 Some(
1062 determination @ TimestampDetermination {
1063 timestamp_context: TimestampContext::TimelineTimestamp { .. },
1064 ..
1065 },
1066 ) if in_immediate_multi_stmt_txn => (determination, None),
1067 _ => {
1068 let determine_bundle = if in_immediate_multi_stmt_txn {
1069 timedomain_bundle = timedomain_for(
1072 self.catalog(),
1073 &self.index_oracle(cluster_id),
1074 source_ids,
1075 &timeline_context,
1076 session.conn_id(),
1077 cluster_id,
1078 )?;
1079
1080 &timedomain_bundle
1081 } else {
1082 source_bundle
1084 };
1085 let (determination, read_holds) = self.determine_timestamp(
1086 session,
1087 determine_bundle,
1088 when,
1089 cluster_id,
1090 &timeline_context,
1091 oracle_read_ts,
1092 real_time_recency_ts,
1093 )?;
1094 let read_holds = match determination.timestamp_context.timestamp() {
1096 Some(_ts) => Some(read_holds),
1097 None => {
1098 drop(read_holds);
1103 None
1104 }
1105 };
1106 (determination, read_holds)
1107 }
1108 };
1109
1110 if let Some(txn_reads) = self.txn_read_holds.get(session.conn_id()) {
1119 let allowed_id_bundle = txn_reads.id_bundle();
1123
1124 drop(read_holds);
1127
1128 let outside = source_bundle.difference(&allowed_id_bundle);
1129 if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
1131 let valid_names =
1132 allowed_id_bundle.resolve_names(self.catalog(), session.conn_id());
1133 let invalid_names = outside.resolve_names(self.catalog(), session.conn_id());
1134 return Err(AdapterError::RelationOutsideTimeDomain {
1135 relations: invalid_names,
1136 names: valid_names,
1137 });
1138 }
1139 } else if let Some(read_holds) = read_holds {
1140 self.store_transaction_read_holds(session.conn_id().clone(), read_holds);
1141 }
1142
1143 let mut transaction_determination = determination.clone();
1154 if when.is_transactional() {
1155 session.add_transaction_ops(TransactionOps::Peeks {
1156 determination: transaction_determination,
1157 cluster_id,
1158 requires_linearization,
1159 })?;
1160 } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
1161 transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
1163 session.add_transaction_ops(TransactionOps::Peeks {
1164 determination: transaction_determination,
1165 cluster_id,
1166 requires_linearization,
1167 })?;
1168 };
1169
1170 Ok(determination)
1171 }
1172}