1use std::collections::{BTreeMap, BTreeSet};
11use std::sync::Arc;
12
13use mz_adapter_types::dyncfgs::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION;
14use mz_compute_types::sinks::ComputeSinkConnection;
15use mz_controller_types::ClusterId;
16use mz_expr::{CollectionPlan, ResultSpec};
17use mz_ore::cast::CastFrom;
18use mz_ore::instrument;
19use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
20use mz_repr::{Datum, GlobalId, Timestamp};
21use mz_sql::ast::{ExplainStage, Statement};
22use mz_sql::catalog::CatalogCluster;
23use mz_sql::plan::QueryWhen;
25use mz_sql::plan::{self};
26use mz_sql::session::metadata::SessionMetadata;
27use mz_transform::EmptyStatisticsOracle;
28use tokio::sync::oneshot;
29use tracing::warn;
30use tracing::{Instrument, Span};
31
32use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
33use crate::command::ExecuteResponse;
34use crate::coord::id_bundle::CollectionIdBundle;
35use crate::coord::peek::{self, PeekDataflowPlan, PeekPlan, PlannedPeek};
36use crate::coord::sequencer::inner::return_if_err;
37use crate::coord::sequencer::{check_log_reads, emit_optimizer_notices, eval_copy_to_uri};
38use crate::coord::timeline::{TimelineContext, timedomain_for};
39use crate::coord::timestamp_selection::{
40 TimestampContext, TimestampDetermination, TimestampProvider,
41};
42use crate::coord::{
43 Coordinator, CopyToContext, ExecuteContext, ExplainContext, ExplainPlanContext, Message,
44 PeekStage, PeekStageCopyTo, PeekStageExplainPlan, PeekStageExplainPushdown, PeekStageFinish,
45 PeekStageLinearizeTimestamp, PeekStageOptimize, PeekStageRealTimeRecency,
46 PeekStageTimestampReadHold, PlanValidity, StageResult, Staged, TargetCluster,
47};
48use crate::error::AdapterError;
49use crate::explain::insights::PlanInsightsContext;
50use crate::explain::optimizer_trace::OptimizerTrace;
51use crate::notice::AdapterNotice;
52use crate::optimize;
53use crate::session::{RequireLinearization, Session, TransactionOps, TransactionStatus};
54use crate::statement_logging::StatementLifecycleEvent;
55use crate::statement_logging::WatchSetCreation;
56
57impl Staged for PeekStage {
58 type Ctx = ExecuteContext;
59
60 fn validity(&mut self) -> &mut PlanValidity {
61 match self {
62 PeekStage::LinearizeTimestamp(stage) => &mut stage.validity,
63 PeekStage::RealTimeRecency(stage) => &mut stage.validity,
64 PeekStage::TimestampReadHold(stage) => &mut stage.validity,
65 PeekStage::Optimize(stage) => &mut stage.validity,
66 PeekStage::Finish(stage) => &mut stage.validity,
67 PeekStage::ExplainPlan(stage) => &mut stage.validity,
68 PeekStage::ExplainPushdown(stage) => &mut stage.validity,
69 PeekStage::CopyToPreflight(stage) => &mut stage.validity,
70 PeekStage::CopyToDataflow(stage) => &mut stage.validity,
71 }
72 }
73
74 async fn stage(
75 self,
76 coord: &mut Coordinator,
77 ctx: &mut ExecuteContext,
78 ) -> Result<StageResult<Box<Self>>, AdapterError> {
79 match self {
80 PeekStage::LinearizeTimestamp(stage) => {
81 coord.peek_linearize_timestamp(ctx.session(), stage).await
82 }
83 PeekStage::RealTimeRecency(stage) => {
84 coord.peek_real_time_recency(ctx.session(), stage).await
85 }
86 PeekStage::TimestampReadHold(stage) => {
87 coord.peek_timestamp_read_hold(ctx.session_mut(), stage)
88 }
89 PeekStage::Optimize(stage) => coord.peek_optimize(ctx.session(), stage).await,
90 PeekStage::Finish(stage) => coord.peek_finish(ctx, stage).await,
91 PeekStage::ExplainPlan(stage) => coord.peek_explain_plan(ctx.session(), stage).await,
92 PeekStage::ExplainPushdown(stage) => {
93 coord.peek_explain_pushdown(ctx.session(), stage).await
94 }
95 PeekStage::CopyToPreflight(stage) => coord.peek_copy_to_preflight(stage).await,
96 PeekStage::CopyToDataflow(stage) => coord.peek_copy_to_dataflow(ctx, stage).await,
97 }
98 }
99
100 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
101 Message::PeekStageReady {
102 ctx,
103 span,
104 stage: self,
105 }
106 }
107
108 fn cancel_enabled(&self) -> bool {
109 true
110 }
111}
112
113impl Coordinator {
114 #[instrument]
121 pub(crate) async fn sequence_peek(
122 &mut self,
123 ctx: ExecuteContext,
124 plan: plan::SelectPlan,
125 target_cluster: TargetCluster,
126 max_query_result_size: Option<u64>,
127 ) {
128 let explain_ctx = if ctx.session().vars().emit_plan_insights_notice() {
129 let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
130 ExplainContext::PlanInsightsNotice(optimizer_trace)
131 } else {
132 ExplainContext::None
133 };
134
135 let stage = return_if_err!(
136 self.peek_validate(
137 ctx.session(),
138 plan,
139 target_cluster,
140 None,
141 explain_ctx,
142 max_query_result_size
143 ),
144 ctx
145 );
146 self.sequence_staged(ctx, Span::current(), stage).await;
147 }
148
149 #[instrument]
150 pub(crate) async fn sequence_copy_to(
151 &mut self,
152 ctx: ExecuteContext,
153 plan::CopyToPlan {
154 select_plan,
155 desc,
156 to,
157 connection,
158 connection_id,
159 format,
160 max_file_size,
161 }: plan::CopyToPlan,
162 target_cluster: TargetCluster,
163 ) {
164 let uri = return_if_err!(
165 eval_copy_to_uri(to, ctx.session(), self.catalog().state()),
166 ctx
167 );
168
169 let stage = return_if_err!(
170 self.peek_validate(
171 ctx.session(),
172 select_plan,
173 target_cluster,
174 Some(CopyToContext {
175 desc,
176 uri,
177 connection,
178 connection_id,
179 format,
180 max_file_size,
181 output_batch_count: None,
183 }),
184 ExplainContext::None,
185 Some(ctx.session().vars().max_query_result_size()),
186 ),
187 ctx
188 );
189 self.sequence_staged(ctx, Span::current(), stage).await;
190 }
191
192 #[instrument]
193 pub(crate) async fn explain_peek(
194 &mut self,
195 ctx: ExecuteContext,
196 plan::ExplainPlanPlan {
197 stage,
198 format,
199 config,
200 explainee,
201 }: plan::ExplainPlanPlan,
202 target_cluster: TargetCluster,
203 ) {
204 let plan::Explainee::Statement(stmt) = explainee else {
205 unreachable!()
208 };
209 let plan::ExplaineeStatement::Select { broken, plan, desc } = stmt else {
210 unreachable!()
213 };
214
215 let optimizer_trace = OptimizerTrace::new(stage.paths());
218
219 let stage = return_if_err!(
220 self.peek_validate(
221 ctx.session(),
222 plan,
223 target_cluster,
224 None,
225 ExplainContext::Plan(ExplainPlanContext {
226 broken,
227 config,
228 format,
229 stage,
230 replan: None,
231 desc: Some(desc),
232 optimizer_trace,
233 }),
234 Some(ctx.session().vars().max_query_result_size()),
235 ),
236 ctx
237 );
238 self.sequence_staged(ctx, Span::current(), stage).await;
239 }
240
241 #[instrument]
243 pub fn peek_validate(
244 &self,
245 session: &Session,
246 plan: mz_sql::plan::SelectPlan,
247 target_cluster: TargetCluster,
248 copy_to_ctx: Option<CopyToContext>,
249 explain_ctx: ExplainContext,
250 max_query_result_size: Option<u64>,
251 ) -> Result<PeekStage, AdapterError> {
252 let catalog = self.owned_catalog();
254 let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
255 let compute_instance = self
256 .instance_snapshot(cluster.id())
257 .expect("compute instance does not exist");
258 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
259 .override_from(&self.catalog.get_cluster(cluster.id()).config.features())
260 .override_from(&self.cluster_scoped_optimizer_overrides(cluster.id()))
261 .override_from(&explain_ctx);
262
263 if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
264 return Err(AdapterError::NoClusterReplicasAvailable {
265 name: cluster.name.clone(),
266 is_managed: cluster.is_managed(),
267 });
268 }
269
270 let optimizer = match copy_to_ctx {
271 None => {
272 let (_, view_id) = self.allocate_transient_id();
274 let (_, index_id) = self.allocate_transient_id();
275
276 optimize::PeekOptimizer::Select(optimize::peek::Optimizer::new(
278 Arc::clone(&catalog),
279 compute_instance,
280 plan.finishing.clone(),
281 view_id,
282 index_id,
283 optimizer_config,
284 self.optimizer_metrics(),
285 ))
286 }
287 Some(mut copy_to_ctx) => {
288 let worker_counts = cluster.replicas().map(|r| {
292 let loc = &r.config.location;
293 loc.workers().unwrap_or_else(|| loc.num_processes())
294 });
295 let max_worker_count = match worker_counts.max() {
296 Some(count) => u64::cast_from(count),
297 None => {
298 return Err(AdapterError::NoClusterReplicasAvailable {
299 name: cluster.name.clone(),
300 is_managed: cluster.is_managed(),
301 });
302 }
303 };
304 copy_to_ctx.output_batch_count = Some(max_worker_count);
305 let (_, view_id) = self.allocate_transient_id();
306 optimize::PeekOptimizer::CopyTo(optimize::copy_to::Optimizer::new(
308 Arc::clone(&catalog),
309 compute_instance,
310 view_id,
311 copy_to_ctx,
312 optimizer_config,
313 self.optimizer_metrics(),
314 ))
315 }
316 };
317
318 let target_replica_name = session.vars().cluster_replica();
319 let mut target_replica = target_replica_name
320 .map(|name| {
321 cluster
322 .replica_id(name)
323 .ok_or(AdapterError::UnknownClusterReplica {
324 cluster_name: cluster.name.clone(),
325 replica_name: name.to_string(),
326 })
327 })
328 .transpose()?;
329
330 let source_ids = plan.source.depends_on();
331 let mut timeline_context = self
332 .catalog()
333 .validate_timeline_context(source_ids.iter().copied())?;
334 if matches!(timeline_context, TimelineContext::TimestampIndependent)
335 && plan.source.contains_temporal()
336 {
337 timeline_context = TimelineContext::TimestampDependent;
341 }
342
343 let notices = check_log_reads(
344 &catalog,
345 cluster,
346 &source_ids,
347 &mut target_replica,
348 session.vars(),
349 )?;
350 session.add_notices(notices);
351
352 let dependencies = source_ids
353 .iter()
354 .map(|id| self.catalog.resolve_item_id(id))
355 .collect();
356 let validity = PlanValidity::new(
357 catalog.transient_revision(),
358 dependencies,
359 Some(cluster.id()),
360 target_replica,
361 session.role_metadata().clone(),
362 );
363
364 Ok(PeekStage::LinearizeTimestamp(PeekStageLinearizeTimestamp {
365 validity,
366 plan,
367 max_query_result_size,
368 source_ids,
369 target_replica,
370 timeline_context,
371 optimizer,
372 explain_ctx,
373 }))
374 }
375
376 #[instrument]
378 async fn peek_linearize_timestamp(
379 &self,
380 session: &Session,
381 PeekStageLinearizeTimestamp {
382 validity,
383 source_ids,
384 plan,
385 max_query_result_size,
386 target_replica,
387 timeline_context,
388 optimizer,
389 explain_ctx,
390 }: PeekStageLinearizeTimestamp,
391 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
392 let isolation_level = session.vars().transaction_isolation().clone();
393 let timeline = Coordinator::get_timeline(&timeline_context);
394 let needs_linearized_read_ts =
395 Coordinator::needs_linearized_read_ts(&isolation_level, &plan.when);
396
397 let build_stage = move |oracle_read_ts: Option<Timestamp>| PeekStageRealTimeRecency {
398 validity,
399 plan,
400 max_query_result_size,
401 source_ids,
402 target_replica,
403 timeline_context,
404 oracle_read_ts,
405 optimizer,
406 explain_ctx,
407 };
408
409 match timeline {
410 Some(timeline) if needs_linearized_read_ts => {
411 let oracle = self.get_timestamp_oracle(&timeline);
412
413 let span = Span::current();
417 Ok(StageResult::Handle(mz_ore::task::spawn(
418 || "linearize timestamp",
419 async move {
420 let oracle_read_ts = oracle.read_ts().await;
421 let stage = build_stage(Some(oracle_read_ts));
422 let stage = PeekStage::RealTimeRecency(stage);
423 Ok(Box::new(stage))
424 }
425 .instrument(span),
426 )))
427 }
428 Some(_) | None => {
429 let stage = build_stage(None);
430 let stage = PeekStage::RealTimeRecency(stage);
431 Ok(StageResult::Immediate(Box::new(stage)))
432 }
433 }
434 }
435
436 #[instrument]
438 fn peek_timestamp_read_hold(
439 &mut self,
440 session: &mut Session,
441 PeekStageTimestampReadHold {
442 mut validity,
443 plan,
444 max_query_result_size,
445 source_ids,
446 target_replica,
447 timeline_context,
448 oracle_read_ts,
449 real_time_recency_ts,
450 optimizer,
451 explain_ctx,
452 }: PeekStageTimestampReadHold,
453 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
454 let cluster_id = optimizer.cluster_id();
455 let id_bundle = self
456 .dataflow_builder(cluster_id)
457 .sufficient_collections(source_ids.iter().copied());
458
459 let item_ids = id_bundle
462 .iter()
463 .map(|id| self.catalog().resolve_item_id(&id));
464 validity.extend_dependencies(item_ids);
465
466 let determination = self.sequence_peek_timestamp(
467 session,
468 &plan.when,
469 cluster_id,
470 timeline_context,
471 oracle_read_ts,
472 &id_bundle,
473 &source_ids,
474 real_time_recency_ts,
475 (&explain_ctx).into(),
476 )?;
477
478 let stage = PeekStage::Optimize(PeekStageOptimize {
479 validity,
480 plan,
481 max_query_result_size,
482 source_ids,
483 id_bundle,
484 target_replica,
485 determination,
486 optimizer,
487 explain_ctx,
488 });
489 Ok(StageResult::Immediate(Box::new(stage)))
490 }
491
492 #[instrument]
493 async fn peek_optimize(
494 &self,
495 session: &Session,
496 PeekStageOptimize {
497 validity,
498 plan,
499 max_query_result_size,
500 source_ids,
501 id_bundle,
502 target_replica,
503 determination,
504 mut optimizer,
505 explain_ctx,
506 }: PeekStageOptimize,
507 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
508 let timestamp_context = determination.timestamp_context.clone();
511 let stats = self
512 .statistics_oracle(session, &source_ids, ×tamp_context.antichain(), true)
513 .await
514 .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
515 let session = session.meta();
516 let now = self.catalog().config().now.clone();
517 let catalog = self.owned_catalog();
518 let mut compute_instances = BTreeMap::new();
519 if explain_ctx.needs_plan_insights() {
520 for cluster in self.catalog().user_clusters() {
523 let snapshot = self.instance_snapshot(cluster.id).expect("must exist");
524 compute_instances.insert(cluster.name.clone(), snapshot);
525 }
526 }
527
528 let span = Span::current();
529 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
530 || "optimize peek",
531 move || {
532 span.in_scope(|| {
533 let pipeline_result = {
539 let _dispatch_guard = explain_ctx.dispatch_guard();
540
541 let raw_expr = plan.source.clone();
542
543 optimizer
544 .optimize(raw_expr, timestamp_context.clone(), &session, stats)
545 .map_err(AdapterError::from)
546 };
547
548 let optimization_finished_at = now();
549
550 let stage = match pipeline_result {
551 Ok(optimize::PeekGlobalLirPlan::Select(global_lir_plan)) => {
552 let optimizer =
553 optimizer.into_select().expect("a SELECT/EXPLAIN optimizer");
554 let needs_plan_insights = explain_ctx.needs_plan_insights();
556 let opt_limit =
561 PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
562 .get(catalog.system_config().dyncfgs());
563 let target_instance =
564 catalog.get_cluster(optimizer.cluster_id()).name.clone();
565 let enable_re_optimize =
566 !(matches!(explain_ctx, ExplainContext::PlanInsightsNotice(_))
567 && optimizer.duration() > opt_limit);
568 let insights_ctx = needs_plan_insights
569 .then(|| PlanInsightsContext {
570 stmt: plan
571 .select
572 .as_deref()
573 .map(Clone::clone)
574 .map(Statement::Select),
575 raw_expr: plan.source.clone(),
576 catalog,
577 compute_instances,
578 target_instance,
579 metrics: optimizer.metrics().clone(),
580 finishing: optimizer.finishing().clone(),
581 optimizer_config: optimizer.config().clone(),
582 session,
583 timestamp_context,
584 view_id: optimizer.select_id(),
585 index_id: optimizer.index_id(),
586 enable_re_optimize,
587 })
588 .map(Box::new);
589 match explain_ctx {
590 ExplainContext::Plan(explain_ctx) => {
591 let (_, df_meta, _) = global_lir_plan.unapply();
592 PeekStage::ExplainPlan(PeekStageExplainPlan {
593 validity,
594 optimizer,
595 df_meta,
596 explain_ctx,
597 insights_ctx,
598 })
599 }
600 ExplainContext::PlanInsightsNotice(optimizer_trace) => {
601 PeekStage::Finish(PeekStageFinish {
602 validity,
603 plan,
604 max_query_result_size,
605 id_bundle,
606 target_replica,
607 source_ids,
608 determination,
609 cluster_id: optimizer.cluster_id(),
610 finishing: optimizer.finishing().clone(),
611 plan_insights_optimizer_trace: Some(optimizer_trace),
612 global_lir_plan,
613 optimization_finished_at,
614 insights_ctx,
615 })
616 }
617 ExplainContext::None => PeekStage::Finish(PeekStageFinish {
618 validity,
619 plan,
620 max_query_result_size,
621 id_bundle,
622 target_replica,
623 source_ids,
624 determination,
625 cluster_id: optimizer.cluster_id(),
626 finishing: optimizer.finishing().clone(),
627 plan_insights_optimizer_trace: None,
628 global_lir_plan,
629 optimization_finished_at,
630 insights_ctx,
631 }),
632 ExplainContext::Pushdown => {
633 let (plan, _, _) = global_lir_plan.unapply();
634 let imports = match plan {
635 PeekPlan::SlowPath(plan) => plan
636 .desc
637 .source_imports
638 .into_iter()
639 .filter_map(|(id, import)| {
640 import.desc.arguments.operators.map(|mfp| (id, mfp))
641 })
642 .collect(),
643 PeekPlan::FastPath(_) => BTreeMap::default(),
644 };
645 PeekStage::ExplainPushdown(PeekStageExplainPushdown {
646 validity,
647 determination,
648 imports,
649 })
650 }
651 }
652 }
653 Ok(optimize::PeekGlobalLirPlan::CopyTo(global_lir_plan)) => {
654 let optimizer = optimizer.into_copy_to().expect("a COPY TO optimizer");
655 PeekStage::CopyToPreflight(PeekStageCopyTo {
656 validity,
657 optimizer,
658 global_lir_plan,
659 optimization_finished_at,
660 source_ids,
661 })
662 }
663 Err(err) => {
666 let Some(optimizer) = optimizer.into_select() else {
667 return Err(err);
670 };
671 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
672 return Err(err);
675 };
676
677 if explain_ctx.broken {
678 tracing::error!("error while handling EXPLAIN statement: {}", err);
682 PeekStage::ExplainPlan(PeekStageExplainPlan {
683 validity,
684 optimizer,
685 df_meta: Default::default(),
686 explain_ctx,
687 insights_ctx: None,
688 })
689 } else {
690 return Err(err);
693 }
694 }
695 };
696 Ok(Box::new(stage))
697 })
698 },
699 )))
700 }
701
702 #[instrument]
703 async fn peek_real_time_recency(
704 &self,
705 session: &Session,
706 PeekStageRealTimeRecency {
707 validity,
708 plan,
709 max_query_result_size,
710 source_ids,
711 target_replica,
712 timeline_context,
713 oracle_read_ts,
714 optimizer,
715 explain_ctx,
716 }: PeekStageRealTimeRecency,
717 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
718 let fut = self
719 .determine_real_time_recent_timestamp_if_needed(session, source_ids.iter().copied())
720 .await?;
721
722 match fut {
723 Some(fut) => {
724 let catalog = Arc::clone(&self.catalog);
725 let span = Span::current();
726 Ok(StageResult::Handle(mz_ore::task::spawn(
727 || "peek real time recency",
728 async move {
729 let real_time_recency_ts =
730 Coordinator::await_real_time_recent_timestamp(catalog, fut).await?;
731 let stage = PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
732 validity,
733 plan,
734 max_query_result_size,
735 target_replica,
736 timeline_context,
737 source_ids,
738 optimizer,
739 explain_ctx,
740 oracle_read_ts,
741 real_time_recency_ts: Some(real_time_recency_ts),
742 });
743 Ok(Box::new(stage))
744 }
745 .instrument(span),
746 )))
747 }
748 None => Ok(StageResult::Immediate(Box::new(
749 PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
750 validity,
751 plan,
752 max_query_result_size,
753 target_replica,
754 timeline_context,
755 source_ids,
756 optimizer,
757 explain_ctx,
758 oracle_read_ts,
759 real_time_recency_ts: None,
760 }),
761 ))),
762 }
763 }
764
765 #[instrument]
766 async fn peek_finish(
767 &mut self,
768 ctx: &mut ExecuteContext,
769 PeekStageFinish {
770 validity: _,
771 plan,
772 max_query_result_size,
773 id_bundle,
774 target_replica,
775 source_ids,
776 determination,
777 cluster_id,
778 finishing,
779 plan_insights_optimizer_trace,
780 global_lir_plan,
781 optimization_finished_at,
782 insights_ctx,
783 }: PeekStageFinish,
784 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
785 if let Some(id) = ctx.extra.contents() {
786 self.record_statement_lifecycle_event(
787 &id,
788 &StatementLifecycleEvent::OptimizationFinished,
789 optimization_finished_at,
790 );
791 }
792
793 let session = ctx.session_mut();
794 let conn_id = session.conn_id().clone();
795
796 let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
797 let source_arity = typ.arity();
798
799 emit_optimizer_notices(&*self.catalog, &*session, &df_meta.optimizer_notices);
800
801 if let Some(trace) = plan_insights_optimizer_trace {
802 let target_cluster = self.catalog().get_cluster(cluster_id);
803 let features = OptimizerFeatures::from(self.catalog().system_config())
804 .override_from(&target_cluster.config.features())
805 .override_from(&self.cluster_scoped_optimizer_overrides(cluster_id));
806 let insights = trace
807 .into_plan_insights(
808 &features,
809 &self.catalog().for_session(session),
810 Some(plan.finishing),
811 Some(target_cluster),
812 df_meta,
813 insights_ctx,
814 )
815 .await?;
816 session.add_notice(AdapterNotice::PlanInsights(insights));
817 }
818
819 let planned_peek = PlannedPeek {
820 plan: peek_plan,
821 determination: determination.clone(),
822 conn_id: conn_id.clone(),
823 intermediate_result_type: typ,
824 source_arity,
825 source_ids,
826 };
827
828 if let Some(transient_index_id) = match &planned_peek.plan {
829 peek::PeekPlan::FastPath(_) => None,
830 peek::PeekPlan::SlowPath(PeekDataflowPlan { id, .. }) => Some(id),
831 } {
832 if let Some(statement_logging_id) = ctx.extra.contents() {
833 self.set_transient_index_id(statement_logging_id, *transient_index_id);
834 }
835 }
836
837 if let Some(logging_id) = ctx.extra().contents() {
838 let watch_set = WatchSetCreation::new(
839 logging_id,
840 self.catalog.state(),
841 &id_bundle,
842 determination.timestamp_context.timestamp_or_default(),
843 );
844 self.install_peek_watch_sets(conn_id.clone(), watch_set).expect("the old peek sequencing re-verifies the dependencies' existence before installing the new watch sets");
845 }
846
847 let max_result_size = self.catalog().system_config().max_result_size();
848
849 let resp = self
851 .implement_peek_plan(
852 ctx.extra_mut(),
853 planned_peek,
854 finishing,
855 cluster_id,
856 target_replica,
857 max_result_size,
858 max_query_result_size,
859 )
860 .await?;
861
862 if ctx.session().vars().emit_timestamp_notice() {
863 let explanation = self.explain_timestamp(
864 ctx.session().conn_id(),
865 ctx.session().pcx().wall_time,
866 cluster_id,
867 &id_bundle,
868 determination,
869 );
870 ctx.session()
871 .add_notice(AdapterNotice::QueryTimestamp { explanation });
872 }
873
874 let resp = match plan.copy_to {
875 None => resp,
876 Some(format) => ExecuteResponse::CopyTo {
877 format,
878 resp: Box::new(resp),
879 },
880 };
881 Ok(StageResult::Response(resp))
882 }
883
884 #[instrument]
885 async fn peek_copy_to_preflight(
886 &self,
887 copy_to: PeekStageCopyTo,
888 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
889 let connection_context = self.connection_context().clone();
890 let enforce_external_addresses = mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
891 .get(self.controller.storage.config().config_set());
892 Ok(StageResult::Handle(mz_ore::task::spawn(
893 || "peek copy to preflight",
894 async move {
895 let sinks = ©_to.global_lir_plan.df_desc().sink_exports;
896 if sinks.len() != 1 {
897 return Err(AdapterError::Internal(
898 "expected exactly one copy to s3 sink".into(),
899 ));
900 }
901 let (sink_id, sink_desc) = sinks
902 .first_key_value()
903 .expect("known to be exactly one copy to s3 sink");
904 match &sink_desc.connection {
905 ComputeSinkConnection::CopyToS3Oneshot(conn) => {
906 mz_storage_types::sinks::s3_oneshot_sink::preflight(
907 connection_context,
908 &conn.aws_connection,
909 &conn.upload_info,
910 conn.connection_id,
911 *sink_id,
912 enforce_external_addresses,
913 )
914 .await?;
915 Ok(Box::new(PeekStage::CopyToDataflow(copy_to)))
916 }
917 _ => Err(AdapterError::Internal(
918 "expected copy to s3 oneshot sink".into(),
919 )),
920 }
921 },
922 )))
923 }
924
925 #[instrument]
926 async fn peek_copy_to_dataflow(
927 &mut self,
928 ctx: &ExecuteContext,
929 PeekStageCopyTo {
930 validity: _,
931 optimizer,
932 global_lir_plan,
933 optimization_finished_at,
934 source_ids,
935 }: PeekStageCopyTo,
936 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
937 if let Some(id) = ctx.extra.contents() {
938 self.record_statement_lifecycle_event(
939 &id,
940 &StatementLifecycleEvent::OptimizationFinished,
941 optimization_finished_at,
942 );
943 }
944
945 let sink_id = global_lir_plan.sink_id();
946 let cluster_id = optimizer.cluster_id();
947
948 let (df_desc, df_meta) = global_lir_plan.unapply();
949
950 emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
951
952 let (tx, rx) = oneshot::channel();
954 let active_copy_to = ActiveCopyTo {
955 conn_id: ctx.session().conn_id().clone(),
956 tx,
957 cluster_id,
958 depends_on: source_ids,
959 };
960 drop(
962 self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
963 .await,
964 );
965
966 self.ship_dataflow(df_desc, cluster_id, None).await;
968
969 let span = Span::current();
970 Ok(StageResult::HandleRetire(mz_ore::task::spawn(
971 || "peek copy to dataflow",
972 async {
973 let res = rx.await;
974 match res {
975 Ok(res) => res,
976 Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
977 }
978 }
979 .instrument(span),
980 )))
981 }
982
983 #[instrument]
984 async fn peek_explain_plan(
985 &self,
986 session: &Session,
987 PeekStageExplainPlan {
988 optimizer,
989 insights_ctx,
990 df_meta,
991 explain_ctx,
992 ..
993 }: PeekStageExplainPlan,
994 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
995 let rows = super::super::explain_plan_inner(
996 session,
997 self.catalog(),
998 df_meta,
999 explain_ctx,
1000 optimizer,
1001 insights_ctx,
1002 )
1003 .await?;
1004
1005 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
1006 }
1007
1008 #[instrument]
1009 async fn peek_explain_pushdown(
1010 &self,
1011 session: &Session,
1012 stage: PeekStageExplainPushdown,
1013 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1014 let as_of = stage.determination.timestamp_context.antichain();
1015 let mz_now = stage
1016 .determination
1017 .timestamp_context
1018 .timestamp()
1019 .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1020 .unwrap_or_else(ResultSpec::value_all);
1021 let fut = self
1022 .explain_pushdown_future(session, as_of, mz_now, stage.imports)
1023 .await;
1024 let span = Span::current();
1025 Ok(StageResult::HandleRetire(mz_ore::task::spawn(
1026 || "peek explain pushdown",
1027 fut.instrument(span),
1028 )))
1029 }
1030
1031 #[instrument]
1034 pub(super) fn sequence_peek_timestamp(
1035 &mut self,
1036 session: &mut Session,
1037 when: &QueryWhen,
1038 cluster_id: ClusterId,
1039 timeline_context: TimelineContext,
1040 oracle_read_ts: Option<Timestamp>,
1041 source_bundle: &CollectionIdBundle,
1042 source_ids: &BTreeSet<GlobalId>,
1043 real_time_recency_ts: Option<Timestamp>,
1044 requires_linearization: RequireLinearization,
1045 ) -> Result<TimestampDetermination, AdapterError> {
1046 let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when);
1047 let timedomain_bundle;
1048
1049 let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
1052 Some(
1054 determination @ TimestampDetermination {
1055 timestamp_context: TimestampContext::TimelineTimestamp { .. },
1056 ..
1057 },
1058 ) if in_immediate_multi_stmt_txn => (determination, None),
1059 _ => {
1060 let determine_bundle = if in_immediate_multi_stmt_txn {
1061 timedomain_bundle = timedomain_for(
1064 self.catalog(),
1065 &self.index_oracle(cluster_id),
1066 source_ids,
1067 &timeline_context,
1068 session.conn_id(),
1069 cluster_id,
1070 )?;
1071
1072 &timedomain_bundle
1073 } else {
1074 source_bundle
1076 };
1077 let (determination, read_holds) = self.determine_timestamp(
1078 session,
1079 determine_bundle,
1080 when,
1081 cluster_id,
1082 &timeline_context,
1083 oracle_read_ts,
1084 real_time_recency_ts,
1085 )?;
1086 let read_holds = match determination.timestamp_context.timestamp() {
1088 Some(_ts) => Some(read_holds),
1089 None => {
1090 drop(read_holds);
1095 None
1096 }
1097 };
1098 (determination, read_holds)
1099 }
1100 };
1101
1102 if let Some(txn_reads) = self.txn_read_holds.get(session.conn_id()) {
1111 let allowed_id_bundle = txn_reads.id_bundle();
1115
1116 drop(read_holds);
1119
1120 let outside = source_bundle.difference(&allowed_id_bundle);
1121 if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
1123 let valid_names =
1124 allowed_id_bundle.resolve_names(self.catalog(), session.conn_id());
1125 let invalid_names = outside.resolve_names(self.catalog(), session.conn_id());
1126 return Err(AdapterError::RelationOutsideTimeDomain {
1127 relations: invalid_names,
1128 names: valid_names,
1129 });
1130 }
1131 } else if let Some(read_holds) = read_holds {
1132 self.store_transaction_read_holds(session.conn_id().clone(), read_holds);
1133 }
1134
1135 let mut transaction_determination = determination.clone();
1146 if when.is_transactional() {
1147 session.add_transaction_ops(TransactionOps::Peeks {
1148 determination: transaction_determination,
1149 cluster_id,
1150 requires_linearization,
1151 })?;
1152 } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
1153 transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
1155 session.add_transaction_ops(TransactionOps::Peeks {
1156 determination: transaction_determination,
1157 cluster_id,
1158 requires_linearization,
1159 })?;
1160 };
1161
1162 Ok(determination)
1163 }
1164}