1use std::collections::{BTreeMap, BTreeSet};
11use std::sync::Arc;
12
13use mz_adapter_types::dyncfgs::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION;
14use mz_compute_types::sinks::ComputeSinkConnection;
15use mz_controller_types::ClusterId;
16use mz_expr::{CollectionPlan, ResultSpec};
17use mz_ore::cast::CastFrom;
18use mz_ore::instrument;
19use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
20use mz_repr::{Datum, GlobalId, Timestamp};
21use mz_sql::ast::{ExplainStage, Statement};
22use mz_sql::catalog::CatalogCluster;
23use mz_sql::plan::QueryWhen;
25use mz_sql::plan::{self};
26use mz_sql::session::metadata::SessionMetadata;
27use mz_transform::EmptyStatisticsOracle;
28use tokio::sync::oneshot;
29use tracing::warn;
30use tracing::{Instrument, Span};
31
32use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
33use crate::command::ExecuteResponse;
34use crate::coord::id_bundle::CollectionIdBundle;
35use crate::coord::peek::{self, PeekDataflowPlan, PeekPlan, PlannedPeek};
36use crate::coord::sequencer::inner::return_if_err;
37use crate::coord::sequencer::{check_log_reads, emit_optimizer_notices, eval_copy_to_uri};
38use crate::coord::timeline::{TimelineContext, timedomain_for};
39use crate::coord::timestamp_selection::{
40 TimestampContext, TimestampDetermination, TimestampProvider,
41};
42use crate::coord::{
43 Coordinator, CopyToContext, ExecuteContext, ExplainContext, ExplainPlanContext, Message,
44 PeekStage, PeekStageCopyTo, PeekStageExplainPlan, PeekStageExplainPushdown, PeekStageFinish,
45 PeekStageLinearizeTimestamp, PeekStageOptimize, PeekStageRealTimeRecency,
46 PeekStageTimestampReadHold, PlanValidity, StageResult, Staged, TargetCluster,
47};
48use crate::error::AdapterError;
49use crate::explain::insights::PlanInsightsContext;
50use crate::explain::optimizer_trace::OptimizerTrace;
51use crate::notice::AdapterNotice;
52use crate::optimize;
53use crate::session::{RequireLinearization, Session, TransactionOps, TransactionStatus};
54use crate::statement_logging::StatementLifecycleEvent;
55use crate::statement_logging::WatchSetCreation;
56
57impl Staged for PeekStage {
58 type Ctx = ExecuteContext;
59
60 fn validity(&mut self) -> &mut PlanValidity {
61 match self {
62 PeekStage::LinearizeTimestamp(stage) => &mut stage.validity,
63 PeekStage::RealTimeRecency(stage) => &mut stage.validity,
64 PeekStage::TimestampReadHold(stage) => &mut stage.validity,
65 PeekStage::Optimize(stage) => &mut stage.validity,
66 PeekStage::Finish(stage) => &mut stage.validity,
67 PeekStage::ExplainPlan(stage) => &mut stage.validity,
68 PeekStage::ExplainPushdown(stage) => &mut stage.validity,
69 PeekStage::CopyToPreflight(stage) => &mut stage.validity,
70 PeekStage::CopyToDataflow(stage) => &mut stage.validity,
71 }
72 }
73
74 async fn stage(
75 self,
76 coord: &mut Coordinator,
77 ctx: &mut ExecuteContext,
78 ) -> Result<StageResult<Box<Self>>, AdapterError> {
79 match self {
80 PeekStage::LinearizeTimestamp(stage) => {
81 coord.peek_linearize_timestamp(ctx.session(), stage).await
82 }
83 PeekStage::RealTimeRecency(stage) => {
84 coord.peek_real_time_recency(ctx.session(), stage).await
85 }
86 PeekStage::TimestampReadHold(stage) => {
87 coord.peek_timestamp_read_hold(ctx.session_mut(), stage)
88 }
89 PeekStage::Optimize(stage) => coord.peek_optimize(ctx.session(), stage).await,
90 PeekStage::Finish(stage) => coord.peek_finish(ctx, stage).await,
91 PeekStage::ExplainPlan(stage) => coord.peek_explain_plan(ctx.session(), stage).await,
92 PeekStage::ExplainPushdown(stage) => {
93 coord.peek_explain_pushdown(ctx.session(), stage).await
94 }
95 PeekStage::CopyToPreflight(stage) => coord.peek_copy_to_preflight(stage).await,
96 PeekStage::CopyToDataflow(stage) => coord.peek_copy_to_dataflow(ctx, stage).await,
97 }
98 }
99
100 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
101 Message::PeekStageReady {
102 ctx,
103 span,
104 stage: self,
105 }
106 }
107
108 fn cancel_enabled(&self) -> bool {
109 true
110 }
111}
112
113impl Coordinator {
114 #[instrument]
121 pub(crate) async fn sequence_peek(
122 &mut self,
123 ctx: ExecuteContext,
124 plan: plan::SelectPlan,
125 target_cluster: TargetCluster,
126 max_query_result_size: Option<u64>,
127 ) {
128 let explain_ctx = if ctx.session().vars().emit_plan_insights_notice() {
129 let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
130 ExplainContext::PlanInsightsNotice(optimizer_trace)
131 } else {
132 ExplainContext::None
133 };
134
135 let stage = return_if_err!(
136 self.peek_validate(
137 ctx.session(),
138 plan,
139 target_cluster,
140 None,
141 explain_ctx,
142 max_query_result_size
143 ),
144 ctx
145 );
146 self.sequence_staged(ctx, Span::current(), stage).await;
147 }
148
149 #[instrument]
150 pub(crate) async fn sequence_copy_to(
151 &mut self,
152 ctx: ExecuteContext,
153 plan::CopyToPlan {
154 select_plan,
155 desc,
156 to,
157 connection,
158 connection_id,
159 format,
160 max_file_size,
161 }: plan::CopyToPlan,
162 target_cluster: TargetCluster,
163 ) {
164 let uri = return_if_err!(
165 eval_copy_to_uri(to, ctx.session(), self.catalog().state()),
166 ctx
167 );
168
169 let stage = return_if_err!(
170 self.peek_validate(
171 ctx.session(),
172 select_plan,
173 target_cluster,
174 Some(CopyToContext {
175 desc,
176 uri,
177 connection,
178 connection_id,
179 format,
180 max_file_size,
181 output_batch_count: None,
183 }),
184 ExplainContext::None,
185 Some(ctx.session().vars().max_query_result_size()),
186 ),
187 ctx
188 );
189 self.sequence_staged(ctx, Span::current(), stage).await;
190 }
191
192 #[instrument]
193 pub(crate) async fn explain_peek(
194 &mut self,
195 ctx: ExecuteContext,
196 plan::ExplainPlanPlan {
197 stage,
198 format,
199 config,
200 explainee,
201 }: plan::ExplainPlanPlan,
202 target_cluster: TargetCluster,
203 ) {
204 let plan::Explainee::Statement(stmt) = explainee else {
205 unreachable!()
208 };
209 let plan::ExplaineeStatement::Select { broken, plan, desc } = stmt else {
210 unreachable!()
213 };
214
215 let optimizer_trace = OptimizerTrace::new(stage.paths());
218
219 let stage = return_if_err!(
220 self.peek_validate(
221 ctx.session(),
222 plan,
223 target_cluster,
224 None,
225 ExplainContext::Plan(ExplainPlanContext {
226 broken,
227 config,
228 format,
229 stage,
230 replan: None,
231 desc: Some(desc),
232 optimizer_trace,
233 }),
234 Some(ctx.session().vars().max_query_result_size()),
235 ),
236 ctx
237 );
238 self.sequence_staged(ctx, Span::current(), stage).await;
239 }
240
241 #[instrument]
243 pub fn peek_validate(
244 &self,
245 session: &Session,
246 plan: mz_sql::plan::SelectPlan,
247 target_cluster: TargetCluster,
248 copy_to_ctx: Option<CopyToContext>,
249 explain_ctx: ExplainContext,
250 max_query_result_size: Option<u64>,
251 ) -> Result<PeekStage, AdapterError> {
252 let catalog = self.owned_catalog();
254 let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
255 let compute_instance = self
256 .instance_snapshot(cluster.id())
257 .expect("compute instance does not exist");
258 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
259 .override_from(&self.catalog.get_cluster(cluster.id()).config.features())
260 .override_from(&explain_ctx);
261
262 if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
263 return Err(AdapterError::NoClusterReplicasAvailable {
264 name: cluster.name.clone(),
265 is_managed: cluster.is_managed(),
266 });
267 }
268
269 let optimizer = match copy_to_ctx {
270 None => {
271 let (_, view_id) = self.allocate_transient_id();
273 let (_, index_id) = self.allocate_transient_id();
274
275 optimize::PeekOptimizer::Select(optimize::peek::Optimizer::new(
277 Arc::clone(&catalog),
278 compute_instance,
279 plan.finishing.clone(),
280 view_id,
281 index_id,
282 optimizer_config,
283 self.optimizer_metrics(),
284 ))
285 }
286 Some(mut copy_to_ctx) => {
287 let worker_counts = cluster.replicas().map(|r| {
291 let loc = &r.config.location;
292 loc.workers().unwrap_or_else(|| loc.num_processes())
293 });
294 let max_worker_count = match worker_counts.max() {
295 Some(count) => u64::cast_from(count),
296 None => {
297 return Err(AdapterError::NoClusterReplicasAvailable {
298 name: cluster.name.clone(),
299 is_managed: cluster.is_managed(),
300 });
301 }
302 };
303 copy_to_ctx.output_batch_count = Some(max_worker_count);
304 let (_, view_id) = self.allocate_transient_id();
305 optimize::PeekOptimizer::CopyTo(optimize::copy_to::Optimizer::new(
307 Arc::clone(&catalog),
308 compute_instance,
309 view_id,
310 copy_to_ctx,
311 optimizer_config,
312 self.optimizer_metrics(),
313 ))
314 }
315 };
316
317 let target_replica_name = session.vars().cluster_replica();
318 let mut target_replica = target_replica_name
319 .map(|name| {
320 cluster
321 .replica_id(name)
322 .ok_or(AdapterError::UnknownClusterReplica {
323 cluster_name: cluster.name.clone(),
324 replica_name: name.to_string(),
325 })
326 })
327 .transpose()?;
328
329 let source_ids = plan.source.depends_on();
330 let mut timeline_context = self
331 .catalog()
332 .validate_timeline_context(source_ids.iter().copied())?;
333 if matches!(timeline_context, TimelineContext::TimestampIndependent)
334 && plan.source.contains_temporal()?
335 {
336 timeline_context = TimelineContext::TimestampDependent;
340 }
341
342 let notices = check_log_reads(
343 &catalog,
344 cluster,
345 &source_ids,
346 &mut target_replica,
347 session.vars(),
348 )?;
349 session.add_notices(notices);
350
351 let dependencies = source_ids
352 .iter()
353 .map(|id| self.catalog.resolve_item_id(id))
354 .collect();
355 let validity = PlanValidity::new(
356 catalog.transient_revision(),
357 dependencies,
358 Some(cluster.id()),
359 target_replica,
360 session.role_metadata().clone(),
361 );
362
363 Ok(PeekStage::LinearizeTimestamp(PeekStageLinearizeTimestamp {
364 validity,
365 plan,
366 max_query_result_size,
367 source_ids,
368 target_replica,
369 timeline_context,
370 optimizer,
371 explain_ctx,
372 }))
373 }
374
375 #[instrument]
377 async fn peek_linearize_timestamp(
378 &self,
379 session: &Session,
380 PeekStageLinearizeTimestamp {
381 validity,
382 source_ids,
383 plan,
384 max_query_result_size,
385 target_replica,
386 timeline_context,
387 optimizer,
388 explain_ctx,
389 }: PeekStageLinearizeTimestamp,
390 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
391 let isolation_level = session.vars().transaction_isolation().clone();
392 let timeline = Coordinator::get_timeline(&timeline_context);
393 let needs_linearized_read_ts =
394 Coordinator::needs_linearized_read_ts(&isolation_level, &plan.when);
395
396 let build_stage = move |oracle_read_ts: Option<Timestamp>| PeekStageRealTimeRecency {
397 validity,
398 plan,
399 max_query_result_size,
400 source_ids,
401 target_replica,
402 timeline_context,
403 oracle_read_ts,
404 optimizer,
405 explain_ctx,
406 };
407
408 match timeline {
409 Some(timeline) if needs_linearized_read_ts => {
410 let oracle = self.get_timestamp_oracle(&timeline);
411
412 let span = Span::current();
416 Ok(StageResult::Handle(mz_ore::task::spawn(
417 || "linearize timestamp",
418 async move {
419 let oracle_read_ts = oracle.read_ts().await;
420 let stage = build_stage(Some(oracle_read_ts));
421 let stage = PeekStage::RealTimeRecency(stage);
422 Ok(Box::new(stage))
423 }
424 .instrument(span),
425 )))
426 }
427 Some(_) | None => {
428 let stage = build_stage(None);
429 let stage = PeekStage::RealTimeRecency(stage);
430 Ok(StageResult::Immediate(Box::new(stage)))
431 }
432 }
433 }
434
435 #[instrument]
437 fn peek_timestamp_read_hold(
438 &mut self,
439 session: &mut Session,
440 PeekStageTimestampReadHold {
441 mut validity,
442 plan,
443 max_query_result_size,
444 source_ids,
445 target_replica,
446 timeline_context,
447 oracle_read_ts,
448 real_time_recency_ts,
449 optimizer,
450 explain_ctx,
451 }: PeekStageTimestampReadHold,
452 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
453 let cluster_id = optimizer.cluster_id();
454 let id_bundle = self
455 .dataflow_builder(cluster_id)
456 .sufficient_collections(source_ids.iter().copied());
457
458 let item_ids = id_bundle
461 .iter()
462 .map(|id| self.catalog().resolve_item_id(&id));
463 validity.extend_dependencies(item_ids);
464
465 let determination = self.sequence_peek_timestamp(
466 session,
467 &plan.when,
468 cluster_id,
469 timeline_context,
470 oracle_read_ts,
471 &id_bundle,
472 &source_ids,
473 real_time_recency_ts,
474 (&explain_ctx).into(),
475 )?;
476
477 let stage = PeekStage::Optimize(PeekStageOptimize {
478 validity,
479 plan,
480 max_query_result_size,
481 source_ids,
482 id_bundle,
483 target_replica,
484 determination,
485 optimizer,
486 explain_ctx,
487 });
488 Ok(StageResult::Immediate(Box::new(stage)))
489 }
490
491 #[instrument]
492 async fn peek_optimize(
493 &self,
494 session: &Session,
495 PeekStageOptimize {
496 validity,
497 plan,
498 max_query_result_size,
499 source_ids,
500 id_bundle,
501 target_replica,
502 determination,
503 mut optimizer,
504 explain_ctx,
505 }: PeekStageOptimize,
506 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
507 let timestamp_context = determination.timestamp_context.clone();
510 let stats = self
511 .statistics_oracle(session, &source_ids, ×tamp_context.antichain(), true)
512 .await
513 .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
514 let session = session.meta();
515 let now = self.catalog().config().now.clone();
516 let catalog = self.owned_catalog();
517 let mut compute_instances = BTreeMap::new();
518 if explain_ctx.needs_plan_insights() {
519 for cluster in self.catalog().user_clusters() {
522 let snapshot = self.instance_snapshot(cluster.id).expect("must exist");
523 compute_instances.insert(cluster.name.clone(), snapshot);
524 }
525 }
526
527 let span = Span::current();
528 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
529 || "optimize peek",
530 move || {
531 span.in_scope(|| {
532 let pipeline_result = {
538 let _dispatch_guard = explain_ctx.dispatch_guard();
539
540 let raw_expr = plan.source.clone();
541
542 optimizer
543 .optimize(raw_expr, timestamp_context.clone(), &session, stats)
544 .map_err(AdapterError::from)
545 };
546
547 let optimization_finished_at = now();
548
549 let stage = match pipeline_result {
550 Ok(optimize::PeekGlobalLirPlan::Select(global_lir_plan)) => {
551 let optimizer =
552 optimizer.into_select().expect("a SELECT/EXPLAIN optimizer");
553 let needs_plan_insights = explain_ctx.needs_plan_insights();
555 let opt_limit =
560 PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
561 .get(catalog.system_config().dyncfgs());
562 let target_instance =
563 catalog.get_cluster(optimizer.cluster_id()).name.clone();
564 let enable_re_optimize =
565 !(matches!(explain_ctx, ExplainContext::PlanInsightsNotice(_))
566 && optimizer.duration() > opt_limit);
567 let insights_ctx = needs_plan_insights
568 .then(|| PlanInsightsContext {
569 stmt: plan
570 .select
571 .as_deref()
572 .map(Clone::clone)
573 .map(Statement::Select),
574 raw_expr: plan.source.clone(),
575 catalog,
576 compute_instances,
577 target_instance,
578 metrics: optimizer.metrics().clone(),
579 finishing: optimizer.finishing().clone(),
580 optimizer_config: optimizer.config().clone(),
581 session,
582 timestamp_context,
583 view_id: optimizer.select_id(),
584 index_id: optimizer.index_id(),
585 enable_re_optimize,
586 })
587 .map(Box::new);
588 match explain_ctx {
589 ExplainContext::Plan(explain_ctx) => {
590 let (_, df_meta, _) = global_lir_plan.unapply();
591 PeekStage::ExplainPlan(PeekStageExplainPlan {
592 validity,
593 optimizer,
594 df_meta,
595 explain_ctx,
596 insights_ctx,
597 })
598 }
599 ExplainContext::PlanInsightsNotice(optimizer_trace) => {
600 PeekStage::Finish(PeekStageFinish {
601 validity,
602 plan,
603 max_query_result_size,
604 id_bundle,
605 target_replica,
606 source_ids,
607 determination,
608 cluster_id: optimizer.cluster_id(),
609 finishing: optimizer.finishing().clone(),
610 plan_insights_optimizer_trace: Some(optimizer_trace),
611 global_lir_plan,
612 optimization_finished_at,
613 insights_ctx,
614 })
615 }
616 ExplainContext::None => PeekStage::Finish(PeekStageFinish {
617 validity,
618 plan,
619 max_query_result_size,
620 id_bundle,
621 target_replica,
622 source_ids,
623 determination,
624 cluster_id: optimizer.cluster_id(),
625 finishing: optimizer.finishing().clone(),
626 plan_insights_optimizer_trace: None,
627 global_lir_plan,
628 optimization_finished_at,
629 insights_ctx,
630 }),
631 ExplainContext::Pushdown => {
632 let (plan, _, _) = global_lir_plan.unapply();
633 let imports = match plan {
634 PeekPlan::SlowPath(plan) => plan
635 .desc
636 .source_imports
637 .into_iter()
638 .filter_map(|(id, import)| {
639 import.desc.arguments.operators.map(|mfp| (id, mfp))
640 })
641 .collect(),
642 PeekPlan::FastPath(_) => BTreeMap::default(),
643 };
644 PeekStage::ExplainPushdown(PeekStageExplainPushdown {
645 validity,
646 determination,
647 imports,
648 })
649 }
650 }
651 }
652 Ok(optimize::PeekGlobalLirPlan::CopyTo(global_lir_plan)) => {
653 let optimizer = optimizer.into_copy_to().expect("a COPY TO optimizer");
654 PeekStage::CopyToPreflight(PeekStageCopyTo {
655 validity,
656 optimizer,
657 global_lir_plan,
658 optimization_finished_at,
659 source_ids,
660 })
661 }
662 Err(err) => {
665 let Some(optimizer) = optimizer.into_select() else {
666 return Err(err);
669 };
670 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
671 return Err(err);
674 };
675
676 if explain_ctx.broken {
677 tracing::error!("error while handling EXPLAIN statement: {}", err);
681 PeekStage::ExplainPlan(PeekStageExplainPlan {
682 validity,
683 optimizer,
684 df_meta: Default::default(),
685 explain_ctx,
686 insights_ctx: None,
687 })
688 } else {
689 return Err(err);
692 }
693 }
694 };
695 Ok(Box::new(stage))
696 })
697 },
698 )))
699 }
700
701 #[instrument]
702 async fn peek_real_time_recency(
703 &self,
704 session: &Session,
705 PeekStageRealTimeRecency {
706 validity,
707 plan,
708 max_query_result_size,
709 source_ids,
710 target_replica,
711 timeline_context,
712 oracle_read_ts,
713 optimizer,
714 explain_ctx,
715 }: PeekStageRealTimeRecency,
716 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
717 let fut = self
718 .determine_real_time_recent_timestamp_if_needed(session, source_ids.iter().copied())
719 .await?;
720
721 match fut {
722 Some(fut) => {
723 let catalog = Arc::clone(&self.catalog);
724 let span = Span::current();
725 Ok(StageResult::Handle(mz_ore::task::spawn(
726 || "peek real time recency",
727 async move {
728 let real_time_recency_ts =
729 Coordinator::await_real_time_recent_timestamp(catalog, fut).await?;
730 let stage = PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
731 validity,
732 plan,
733 max_query_result_size,
734 target_replica,
735 timeline_context,
736 source_ids,
737 optimizer,
738 explain_ctx,
739 oracle_read_ts,
740 real_time_recency_ts: Some(real_time_recency_ts),
741 });
742 Ok(Box::new(stage))
743 }
744 .instrument(span),
745 )))
746 }
747 None => Ok(StageResult::Immediate(Box::new(
748 PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
749 validity,
750 plan,
751 max_query_result_size,
752 target_replica,
753 timeline_context,
754 source_ids,
755 optimizer,
756 explain_ctx,
757 oracle_read_ts,
758 real_time_recency_ts: None,
759 }),
760 ))),
761 }
762 }
763
764 #[instrument]
765 async fn peek_finish(
766 &mut self,
767 ctx: &mut ExecuteContext,
768 PeekStageFinish {
769 validity: _,
770 plan,
771 max_query_result_size,
772 id_bundle,
773 target_replica,
774 source_ids,
775 determination,
776 cluster_id,
777 finishing,
778 plan_insights_optimizer_trace,
779 global_lir_plan,
780 optimization_finished_at,
781 insights_ctx,
782 }: PeekStageFinish,
783 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
784 if let Some(id) = ctx.extra.contents() {
785 self.record_statement_lifecycle_event(
786 &id,
787 &StatementLifecycleEvent::OptimizationFinished,
788 optimization_finished_at,
789 );
790 }
791
792 let session = ctx.session_mut();
793 let conn_id = session.conn_id().clone();
794
795 let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
796 let source_arity = typ.arity();
797
798 emit_optimizer_notices(&*self.catalog, &*session, &df_meta.optimizer_notices);
799
800 if let Some(trace) = plan_insights_optimizer_trace {
801 let target_cluster = self.catalog().get_cluster(cluster_id);
802 let features = OptimizerFeatures::from(self.catalog().system_config())
803 .override_from(&target_cluster.config.features());
804 let insights = trace
805 .into_plan_insights(
806 &features,
807 &self.catalog().for_session(session),
808 Some(plan.finishing),
809 Some(target_cluster),
810 df_meta,
811 insights_ctx,
812 )
813 .await?;
814 session.add_notice(AdapterNotice::PlanInsights(insights));
815 }
816
817 let planned_peek = PlannedPeek {
818 plan: peek_plan,
819 determination: determination.clone(),
820 conn_id: conn_id.clone(),
821 intermediate_result_type: typ,
822 source_arity,
823 source_ids,
824 };
825
826 if let Some(transient_index_id) = match &planned_peek.plan {
827 peek::PeekPlan::FastPath(_) => None,
828 peek::PeekPlan::SlowPath(PeekDataflowPlan { id, .. }) => Some(id),
829 } {
830 if let Some(statement_logging_id) = ctx.extra.contents() {
831 self.set_transient_index_id(statement_logging_id, *transient_index_id);
832 }
833 }
834
835 if let Some(logging_id) = ctx.extra().contents() {
836 let watch_set = WatchSetCreation::new(
837 logging_id,
838 self.catalog.state(),
839 &id_bundle,
840 determination.timestamp_context.timestamp_or_default(),
841 );
842 self.install_peek_watch_sets(conn_id.clone(), watch_set).expect("the old peek sequencing re-verifies the dependencies' existence before installing the new watch sets");
843 }
844
845 let max_result_size = self.catalog().system_config().max_result_size();
846
847 let resp = self
849 .implement_peek_plan(
850 ctx.extra_mut(),
851 planned_peek,
852 finishing,
853 cluster_id,
854 target_replica,
855 max_result_size,
856 max_query_result_size,
857 )
858 .await?;
859
860 if ctx.session().vars().emit_timestamp_notice() {
861 let explanation = self.explain_timestamp(
862 ctx.session().conn_id(),
863 ctx.session().pcx().wall_time,
864 cluster_id,
865 &id_bundle,
866 determination,
867 );
868 ctx.session()
869 .add_notice(AdapterNotice::QueryTimestamp { explanation });
870 }
871
872 let resp = match plan.copy_to {
873 None => resp,
874 Some(format) => ExecuteResponse::CopyTo {
875 format,
876 resp: Box::new(resp),
877 },
878 };
879 Ok(StageResult::Response(resp))
880 }
881
882 #[instrument]
883 async fn peek_copy_to_preflight(
884 &self,
885 copy_to: PeekStageCopyTo,
886 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
887 let connection_context = self.connection_context().clone();
888 let enforce_external_addresses = mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
889 .get(self.controller.storage.config().config_set());
890 Ok(StageResult::Handle(mz_ore::task::spawn(
891 || "peek copy to preflight",
892 async move {
893 let sinks = ©_to.global_lir_plan.df_desc().sink_exports;
894 if sinks.len() != 1 {
895 return Err(AdapterError::Internal(
896 "expected exactly one copy to s3 sink".into(),
897 ));
898 }
899 let (sink_id, sink_desc) = sinks
900 .first_key_value()
901 .expect("known to be exactly one copy to s3 sink");
902 match &sink_desc.connection {
903 ComputeSinkConnection::CopyToS3Oneshot(conn) => {
904 mz_storage_types::sinks::s3_oneshot_sink::preflight(
905 connection_context,
906 &conn.aws_connection,
907 &conn.upload_info,
908 conn.connection_id,
909 *sink_id,
910 enforce_external_addresses,
911 )
912 .await?;
913 Ok(Box::new(PeekStage::CopyToDataflow(copy_to)))
914 }
915 _ => Err(AdapterError::Internal(
916 "expected copy to s3 oneshot sink".into(),
917 )),
918 }
919 },
920 )))
921 }
922
923 #[instrument]
924 async fn peek_copy_to_dataflow(
925 &mut self,
926 ctx: &ExecuteContext,
927 PeekStageCopyTo {
928 validity: _,
929 optimizer,
930 global_lir_plan,
931 optimization_finished_at,
932 source_ids,
933 }: PeekStageCopyTo,
934 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
935 if let Some(id) = ctx.extra.contents() {
936 self.record_statement_lifecycle_event(
937 &id,
938 &StatementLifecycleEvent::OptimizationFinished,
939 optimization_finished_at,
940 );
941 }
942
943 let sink_id = global_lir_plan.sink_id();
944 let cluster_id = optimizer.cluster_id();
945
946 let (df_desc, df_meta) = global_lir_plan.unapply();
947
948 emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
949
950 let (tx, rx) = oneshot::channel();
952 let active_copy_to = ActiveCopyTo {
953 conn_id: ctx.session().conn_id().clone(),
954 tx,
955 cluster_id,
956 depends_on: source_ids,
957 };
958 drop(
960 self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
961 .await,
962 );
963
964 self.ship_dataflow(df_desc, cluster_id, None).await;
966
967 let span = Span::current();
968 Ok(StageResult::HandleRetire(mz_ore::task::spawn(
969 || "peek copy to dataflow",
970 async {
971 let res = rx.await;
972 match res {
973 Ok(res) => res,
974 Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
975 }
976 }
977 .instrument(span),
978 )))
979 }
980
981 #[instrument]
982 async fn peek_explain_plan(
983 &self,
984 session: &Session,
985 PeekStageExplainPlan {
986 optimizer,
987 insights_ctx,
988 df_meta,
989 explain_ctx,
990 ..
991 }: PeekStageExplainPlan,
992 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
993 let rows = super::super::explain_plan_inner(
994 session,
995 self.catalog(),
996 df_meta,
997 explain_ctx,
998 optimizer,
999 insights_ctx,
1000 )
1001 .await?;
1002
1003 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
1004 }
1005
1006 #[instrument]
1007 async fn peek_explain_pushdown(
1008 &self,
1009 session: &Session,
1010 stage: PeekStageExplainPushdown,
1011 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1012 let as_of = stage.determination.timestamp_context.antichain();
1013 let mz_now = stage
1014 .determination
1015 .timestamp_context
1016 .timestamp()
1017 .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1018 .unwrap_or_else(ResultSpec::value_all);
1019 let fut = self
1020 .explain_pushdown_future(session, as_of, mz_now, stage.imports)
1021 .await;
1022 let span = Span::current();
1023 Ok(StageResult::HandleRetire(mz_ore::task::spawn(
1024 || "peek explain pushdown",
1025 fut.instrument(span),
1026 )))
1027 }
1028
1029 #[instrument]
1032 pub(super) fn sequence_peek_timestamp(
1033 &mut self,
1034 session: &mut Session,
1035 when: &QueryWhen,
1036 cluster_id: ClusterId,
1037 timeline_context: TimelineContext,
1038 oracle_read_ts: Option<Timestamp>,
1039 source_bundle: &CollectionIdBundle,
1040 source_ids: &BTreeSet<GlobalId>,
1041 real_time_recency_ts: Option<Timestamp>,
1042 requires_linearization: RequireLinearization,
1043 ) -> Result<TimestampDetermination, AdapterError> {
1044 let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when);
1045 let timedomain_bundle;
1046
1047 let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
1050 Some(
1052 determination @ TimestampDetermination {
1053 timestamp_context: TimestampContext::TimelineTimestamp { .. },
1054 ..
1055 },
1056 ) if in_immediate_multi_stmt_txn => (determination, None),
1057 _ => {
1058 let determine_bundle = if in_immediate_multi_stmt_txn {
1059 timedomain_bundle = timedomain_for(
1062 self.catalog(),
1063 &self.index_oracle(cluster_id),
1064 source_ids,
1065 &timeline_context,
1066 session.conn_id(),
1067 cluster_id,
1068 )?;
1069
1070 &timedomain_bundle
1071 } else {
1072 source_bundle
1074 };
1075 let (determination, read_holds) = self.determine_timestamp(
1076 session,
1077 determine_bundle,
1078 when,
1079 cluster_id,
1080 &timeline_context,
1081 oracle_read_ts,
1082 real_time_recency_ts,
1083 )?;
1084 let read_holds = match determination.timestamp_context.timestamp() {
1086 Some(_ts) => Some(read_holds),
1087 None => {
1088 drop(read_holds);
1093 None
1094 }
1095 };
1096 (determination, read_holds)
1097 }
1098 };
1099
1100 if let Some(txn_reads) = self.txn_read_holds.get(session.conn_id()) {
1109 let allowed_id_bundle = txn_reads.id_bundle();
1113
1114 drop(read_holds);
1117
1118 let outside = source_bundle.difference(&allowed_id_bundle);
1119 if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
1121 let valid_names =
1122 allowed_id_bundle.resolve_names(self.catalog(), session.conn_id());
1123 let invalid_names = outside.resolve_names(self.catalog(), session.conn_id());
1124 return Err(AdapterError::RelationOutsideTimeDomain {
1125 relations: invalid_names,
1126 names: valid_names,
1127 });
1128 }
1129 } else if let Some(read_holds) = read_holds {
1130 self.store_transaction_read_holds(session.conn_id().clone(), read_holds);
1131 }
1132
1133 let mut transaction_determination = determination.clone();
1144 if when.is_transactional() {
1145 session.add_transaction_ops(TransactionOps::Peeks {
1146 determination: transaction_determination,
1147 cluster_id,
1148 requires_linearization,
1149 })?;
1150 } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
1151 transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
1153 session.add_transaction_ops(TransactionOps::Peeks {
1154 determination: transaction_determination,
1155 cluster_id,
1156 requires_linearization,
1157 })?;
1158 };
1159
1160 Ok(determination)
1161 }
1162}