1use std::collections::{BTreeMap, BTreeSet};
11use std::sync::Arc;
12
13use itertools::Either;
14use mz_adapter_types::dyncfgs::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION;
15use mz_catalog::memory::objects::CatalogItem;
16use mz_compute_types::sinks::ComputeSinkConnection;
17use mz_controller_types::ClusterId;
18use mz_expr::{CollectionPlan, ResultSpec};
19use mz_ore::cast::CastFrom;
20use mz_ore::instrument;
21use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
22use mz_repr::{Datum, GlobalId, Timestamp};
23use mz_sql::ast::{ExplainStage, Statement};
24use mz_sql::catalog::CatalogCluster;
25use mz_sql::plan::QueryWhen;
27use mz_sql::plan::{self};
28use mz_sql::session::metadata::SessionMetadata;
29use mz_transform::EmptyStatisticsOracle;
30use tokio::sync::oneshot;
31use tracing::warn;
32use tracing::{Instrument, Span};
33
34use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
35use crate::command::ExecuteResponse;
36use crate::coord::id_bundle::CollectionIdBundle;
37use crate::coord::peek::{self, PeekDataflowPlan, PeekPlan, PlannedPeek};
38use crate::coord::sequencer::inner::return_if_err;
39use crate::coord::sequencer::{check_log_reads, emit_optimizer_notices, eval_copy_to_uri};
40use crate::coord::timeline::{TimelineContext, timedomain_for};
41use crate::coord::timestamp_selection::{
42 TimestampContext, TimestampDetermination, TimestampProvider,
43};
44use crate::coord::{
45 Coordinator, CopyToContext, ExecuteContext, ExplainContext, ExplainPlanContext, Message,
46 PeekStage, PeekStageCopyTo, PeekStageExplainPlan, PeekStageExplainPushdown, PeekStageFinish,
47 PeekStageLinearizeTimestamp, PeekStageOptimize, PeekStageRealTimeRecency,
48 PeekStageTimestampReadHold, PlanValidity, StageResult, Staged, TargetCluster, WatchSetResponse,
49};
50use crate::error::AdapterError;
51use crate::explain::insights::PlanInsightsContext;
52use crate::explain::optimizer_trace::OptimizerTrace;
53use crate::notice::AdapterNotice;
54use crate::optimize::{self, Optimize};
55use crate::session::{RequireLinearization, Session, TransactionOps, TransactionStatus};
56use crate::statement_logging::StatementLifecycleEvent;
57
58impl Staged for PeekStage {
59 type Ctx = ExecuteContext;
60
61 fn validity(&mut self) -> &mut PlanValidity {
62 match self {
63 PeekStage::LinearizeTimestamp(stage) => &mut stage.validity,
64 PeekStage::RealTimeRecency(stage) => &mut stage.validity,
65 PeekStage::TimestampReadHold(stage) => &mut stage.validity,
66 PeekStage::Optimize(stage) => &mut stage.validity,
67 PeekStage::Finish(stage) => &mut stage.validity,
68 PeekStage::ExplainPlan(stage) => &mut stage.validity,
69 PeekStage::ExplainPushdown(stage) => &mut stage.validity,
70 PeekStage::CopyToPreflight(stage) => &mut stage.validity,
71 PeekStage::CopyToDataflow(stage) => &mut stage.validity,
72 }
73 }
74
75 async fn stage(
76 self,
77 coord: &mut Coordinator,
78 ctx: &mut ExecuteContext,
79 ) -> Result<StageResult<Box<Self>>, AdapterError> {
80 match self {
81 PeekStage::LinearizeTimestamp(stage) => {
82 coord.peek_linearize_timestamp(ctx.session(), stage).await
83 }
84 PeekStage::RealTimeRecency(stage) => {
85 coord.peek_real_time_recency(ctx.session(), stage).await
86 }
87 PeekStage::TimestampReadHold(stage) => {
88 coord.peek_timestamp_read_hold(ctx.session_mut(), stage)
89 }
90 PeekStage::Optimize(stage) => coord.peek_optimize(ctx.session(), stage).await,
91 PeekStage::Finish(stage) => coord.peek_finish(ctx, stage).await,
92 PeekStage::ExplainPlan(stage) => coord.peek_explain_plan(ctx.session(), stage).await,
93 PeekStage::ExplainPushdown(stage) => {
94 coord.peek_explain_pushdown(ctx.session(), stage).await
95 }
96 PeekStage::CopyToPreflight(stage) => coord.peek_copy_to_preflight(stage).await,
97 PeekStage::CopyToDataflow(stage) => coord.peek_copy_to_dataflow(ctx, stage).await,
98 }
99 }
100
101 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
102 Message::PeekStageReady {
103 ctx,
104 span,
105 stage: self,
106 }
107 }
108
109 fn cancel_enabled(&self) -> bool {
110 true
111 }
112}
113
114impl Coordinator {
115 #[instrument]
122 pub(crate) async fn sequence_peek(
123 &mut self,
124 ctx: ExecuteContext,
125 plan: plan::SelectPlan,
126 target_cluster: TargetCluster,
127 max_query_result_size: Option<u64>,
128 ) {
129 let explain_ctx = if ctx.session().vars().emit_plan_insights_notice() {
130 let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
131 ExplainContext::PlanInsightsNotice(optimizer_trace)
132 } else {
133 ExplainContext::None
134 };
135
136 let stage = return_if_err!(
137 self.peek_validate(
138 ctx.session(),
139 plan,
140 target_cluster,
141 None,
142 explain_ctx,
143 max_query_result_size
144 ),
145 ctx
146 );
147 self.sequence_staged(ctx, Span::current(), stage).await;
148 }
149
150 #[instrument]
151 pub(crate) async fn sequence_copy_to(
152 &mut self,
153 ctx: ExecuteContext,
154 plan::CopyToPlan {
155 select_plan,
156 desc,
157 to,
158 connection,
159 connection_id,
160 format,
161 max_file_size,
162 }: plan::CopyToPlan,
163 target_cluster: TargetCluster,
164 ) {
165 let uri = return_if_err!(
166 eval_copy_to_uri(to, ctx.session(), self.catalog().state()),
167 ctx
168 );
169
170 let stage = return_if_err!(
171 self.peek_validate(
172 ctx.session(),
173 select_plan,
174 target_cluster,
175 Some(CopyToContext {
176 desc,
177 uri,
178 connection,
179 connection_id,
180 format,
181 max_file_size,
182 output_batch_count: None,
184 }),
185 ExplainContext::None,
186 Some(ctx.session().vars().max_query_result_size()),
187 ),
188 ctx
189 );
190 self.sequence_staged(ctx, Span::current(), stage).await;
191 }
192
193 #[instrument]
194 pub(crate) async fn explain_peek(
195 &mut self,
196 ctx: ExecuteContext,
197 plan::ExplainPlanPlan {
198 stage,
199 format,
200 config,
201 explainee,
202 }: plan::ExplainPlanPlan,
203 target_cluster: TargetCluster,
204 ) {
205 let plan::Explainee::Statement(stmt) = explainee else {
206 unreachable!()
209 };
210 let plan::ExplaineeStatement::Select { broken, plan, desc } = stmt else {
211 unreachable!()
214 };
215
216 let optimizer_trace = OptimizerTrace::new(stage.paths());
219
220 let stage = return_if_err!(
221 self.peek_validate(
222 ctx.session(),
223 plan,
224 target_cluster,
225 None,
226 ExplainContext::Plan(ExplainPlanContext {
227 broken,
228 config,
229 format,
230 stage,
231 replan: None,
232 desc: Some(desc),
233 optimizer_trace,
234 }),
235 Some(ctx.session().vars().max_query_result_size()),
236 ),
237 ctx
238 );
239 self.sequence_staged(ctx, Span::current(), stage).await;
240 }
241
242 #[instrument]
244 pub fn peek_validate(
245 &self,
246 session: &Session,
247 plan: mz_sql::plan::SelectPlan,
248 target_cluster: TargetCluster,
249 copy_to_ctx: Option<CopyToContext>,
250 explain_ctx: ExplainContext,
251 max_query_result_size: Option<u64>,
252 ) -> Result<PeekStage, AdapterError> {
253 let catalog = self.owned_catalog();
255 let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
256 let compute_instance = self
257 .instance_snapshot(cluster.id())
258 .expect("compute instance does not exist");
259 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
260 .override_from(&self.catalog.get_cluster(cluster.id()).config.features())
261 .override_from(&explain_ctx);
262
263 if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
264 return Err(AdapterError::NoClusterReplicasAvailable {
265 name: cluster.name.clone(),
266 is_managed: cluster.is_managed(),
267 });
268 }
269
270 let optimizer = match copy_to_ctx {
271 None => {
272 let (_, view_id) = self.allocate_transient_id();
274 let (_, index_id) = self.allocate_transient_id();
275
276 Either::Left(optimize::peek::Optimizer::new(
278 Arc::clone(&catalog),
279 compute_instance,
280 plan.finishing.clone(),
281 view_id,
282 index_id,
283 optimizer_config,
284 self.optimizer_metrics(),
285 ))
286 }
287 Some(mut copy_to_ctx) => {
288 let worker_counts = cluster.replicas().map(|r| {
292 let loc = &r.config.location;
293 loc.workers().unwrap_or_else(|| loc.num_processes())
294 });
295 let max_worker_count = match worker_counts.max() {
296 Some(count) => u64::cast_from(count),
297 None => {
298 return Err(AdapterError::NoClusterReplicasAvailable {
299 name: cluster.name.clone(),
300 is_managed: cluster.is_managed(),
301 });
302 }
303 };
304 copy_to_ctx.output_batch_count = Some(max_worker_count);
305 let (_, view_id) = self.allocate_transient_id();
306 Either::Right(optimize::copy_to::Optimizer::new(
308 Arc::clone(&catalog),
309 compute_instance,
310 view_id,
311 copy_to_ctx,
312 optimizer_config,
313 self.optimizer_metrics(),
314 ))
315 }
316 };
317
318 let target_replica_name = session.vars().cluster_replica();
319 let mut target_replica = target_replica_name
320 .map(|name| {
321 cluster
322 .replica_id(name)
323 .ok_or(AdapterError::UnknownClusterReplica {
324 cluster_name: cluster.name.clone(),
325 replica_name: name.to_string(),
326 })
327 })
328 .transpose()?;
329
330 let source_ids = plan.source.depends_on();
331 let mut timeline_context = self
332 .catalog()
333 .validate_timeline_context(source_ids.iter().copied())?;
334 if matches!(timeline_context, TimelineContext::TimestampIndependent)
335 && plan.source.contains_temporal()?
336 {
337 timeline_context = TimelineContext::TimestampDependent;
341 }
342
343 let notices = check_log_reads(
344 &catalog,
345 cluster,
346 &source_ids,
347 &mut target_replica,
348 session.vars(),
349 )?;
350 session.add_notices(notices);
351
352 let dependencies = source_ids
353 .iter()
354 .map(|id| self.catalog.resolve_item_id(id))
355 .collect();
356 let validity = PlanValidity::new(
357 catalog.transient_revision(),
358 dependencies,
359 Some(cluster.id()),
360 target_replica,
361 session.role_metadata().clone(),
362 );
363
364 Ok(PeekStage::LinearizeTimestamp(PeekStageLinearizeTimestamp {
365 validity,
366 plan,
367 max_query_result_size,
368 source_ids,
369 target_replica,
370 timeline_context,
371 optimizer,
372 explain_ctx,
373 }))
374 }
375
376 #[instrument]
378 async fn peek_linearize_timestamp(
379 &self,
380 session: &Session,
381 PeekStageLinearizeTimestamp {
382 validity,
383 source_ids,
384 plan,
385 max_query_result_size,
386 target_replica,
387 timeline_context,
388 optimizer,
389 explain_ctx,
390 }: PeekStageLinearizeTimestamp,
391 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
392 let isolation_level = session.vars().transaction_isolation().clone();
393 let timeline = Coordinator::get_timeline(&timeline_context);
394 let needs_linearized_read_ts =
395 Coordinator::needs_linearized_read_ts(&isolation_level, &plan.when);
396
397 let build_stage = move |oracle_read_ts: Option<Timestamp>| PeekStageRealTimeRecency {
398 validity,
399 plan,
400 max_query_result_size,
401 source_ids,
402 target_replica,
403 timeline_context,
404 oracle_read_ts,
405 optimizer,
406 explain_ctx,
407 };
408
409 match timeline {
410 Some(timeline) if needs_linearized_read_ts => {
411 let oracle = self.get_timestamp_oracle(&timeline);
412
413 let span = Span::current();
417 Ok(StageResult::Handle(mz_ore::task::spawn(
418 || "linearize timestamp",
419 async move {
420 let oracle_read_ts = oracle.read_ts().await;
421 let stage = build_stage(Some(oracle_read_ts));
422 let stage = PeekStage::RealTimeRecency(stage);
423 Ok(Box::new(stage))
424 }
425 .instrument(span),
426 )))
427 }
428 Some(_) | None => {
429 let stage = build_stage(None);
430 let stage = PeekStage::RealTimeRecency(stage);
431 Ok(StageResult::Immediate(Box::new(stage)))
432 }
433 }
434 }
435
436 #[instrument]
438 fn peek_timestamp_read_hold(
439 &mut self,
440 session: &mut Session,
441 PeekStageTimestampReadHold {
442 mut validity,
443 plan,
444 max_query_result_size,
445 source_ids,
446 target_replica,
447 timeline_context,
448 oracle_read_ts,
449 real_time_recency_ts,
450 optimizer,
451 explain_ctx,
452 }: PeekStageTimestampReadHold,
453 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
454 let cluster_id = match optimizer.as_ref() {
455 Either::Left(optimizer) => optimizer.cluster_id(),
456 Either::Right(optimizer) => optimizer.cluster_id(),
457 };
458 let id_bundle = self
459 .dataflow_builder(cluster_id)
460 .sufficient_collections(source_ids.iter().copied());
461
462 let item_ids = id_bundle
465 .iter()
466 .map(|id| self.catalog().resolve_item_id(&id));
467 validity.extend_dependencies(item_ids);
468
469 let determination = self.sequence_peek_timestamp(
470 session,
471 &plan.when,
472 cluster_id,
473 timeline_context,
474 oracle_read_ts,
475 &id_bundle,
476 &source_ids,
477 real_time_recency_ts,
478 (&explain_ctx).into(),
479 )?;
480
481 let stage = PeekStage::Optimize(PeekStageOptimize {
482 validity,
483 plan,
484 max_query_result_size,
485 source_ids,
486 id_bundle,
487 target_replica,
488 determination,
489 optimizer,
490 explain_ctx,
491 });
492 Ok(StageResult::Immediate(Box::new(stage)))
493 }
494
495 #[instrument]
496 async fn peek_optimize(
497 &self,
498 session: &Session,
499 PeekStageOptimize {
500 validity,
501 plan,
502 max_query_result_size,
503 source_ids,
504 id_bundle,
505 target_replica,
506 determination,
507 mut optimizer,
508 explain_ctx,
509 }: PeekStageOptimize,
510 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
511 let timestamp_context = determination.timestamp_context.clone();
514 let stats = self
515 .statistics_oracle(session, &source_ids, ×tamp_context.antichain(), true)
516 .await
517 .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
518 let session = session.meta();
519 let now = self.catalog().config().now.clone();
520 let catalog = self.owned_catalog();
521 let mut compute_instances = BTreeMap::new();
522 if explain_ctx.needs_plan_insights() {
523 for cluster in self.catalog().user_clusters() {
526 let snapshot = self.instance_snapshot(cluster.id).expect("must exist");
527 compute_instances.insert(cluster.name.clone(), snapshot);
528 }
529 }
530
531 let span = Span::current();
532 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
533 || "optimize peek",
534 move || {
535 span.in_scope(|| {
536 let pipeline = || -> Result<Either<optimize::peek::GlobalLirPlan, optimize::copy_to::GlobalLirPlan>, AdapterError> {
537 let _dispatch_guard = explain_ctx.dispatch_guard();
538
539 let raw_expr = plan.source.clone();
540
541 match optimizer.as_mut() {
542 Either::Left(optimizer) => {
544 let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
546 let local_mir_plan = local_mir_plan.resolve(timestamp_context.clone(), &session, stats);
548 let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
550
551 Ok(Either::Left(global_lir_plan))
552 }
553 Either::Right(optimizer) => {
555 let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
557 let local_mir_plan = local_mir_plan.resolve(timestamp_context.clone(), &session, stats);
559 let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
561
562 Ok(Either::Right(global_lir_plan))
563 }
564 }
565 };
566
567 let pipeline_result = pipeline();
568 let optimization_finished_at = now();
569
570 let stage = match pipeline_result {
571 Ok(Either::Left(global_lir_plan)) => {
572 let optimizer = optimizer.unwrap_left();
573 let needs_plan_insights = explain_ctx.needs_plan_insights();
575 let opt_limit = PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
580 .get(catalog.system_config().dyncfgs());
581 let target_instance = catalog
582 .get_cluster(optimizer.cluster_id())
583 .name
584 .clone();
585 let enable_re_optimize =
586 !(matches!(explain_ctx, ExplainContext::PlanInsightsNotice(_))
587 && optimizer.duration() > opt_limit);
588 let insights_ctx = needs_plan_insights.then(|| PlanInsightsContext {
589 stmt: plan.select.as_deref().map(Clone::clone).map(Statement::Select),
590 raw_expr: plan.source.clone(),
591 catalog,
592 compute_instances,
593 target_instance,
594 metrics: optimizer.metrics().clone(),
595 finishing: optimizer.finishing().clone(),
596 optimizer_config: optimizer.config().clone(),
597 session,
598 timestamp_context,
599 view_id: optimizer.select_id(),
600 index_id: optimizer.index_id(),
601 enable_re_optimize,
602 }).map(Box::new);
603 match explain_ctx {
604 ExplainContext::Plan(explain_ctx) => {
605 let (_, df_meta, _) = global_lir_plan.unapply();
606 PeekStage::ExplainPlan(PeekStageExplainPlan {
607 validity,
608 optimizer,
609 df_meta,
610 explain_ctx,
611 insights_ctx,
612 })
613 }
614 ExplainContext::PlanInsightsNotice(optimizer_trace) => {
615 PeekStage::Finish(PeekStageFinish {
616 validity,
617 plan,
618 max_query_result_size,
619 id_bundle,
620 target_replica,
621 source_ids,
622 determination,
623 cluster_id: optimizer.cluster_id(),
624 finishing: optimizer.finishing().clone(),
625 plan_insights_optimizer_trace: Some(optimizer_trace),
626 global_lir_plan,
627 optimization_finished_at,
628 insights_ctx,
629 })
630 }
631 ExplainContext::None => PeekStage::Finish(PeekStageFinish {
632 validity,
633 plan,
634 max_query_result_size,
635 id_bundle,
636 target_replica,
637 source_ids,
638 determination,
639 cluster_id: optimizer.cluster_id(),
640 finishing: optimizer.finishing().clone(),
641 plan_insights_optimizer_trace: None,
642 global_lir_plan,
643 optimization_finished_at,
644 insights_ctx,
645 }),
646 ExplainContext::Pushdown => {
647 let (plan, _, _) = global_lir_plan.unapply();
648 let imports = match plan {
649 PeekPlan::SlowPath(plan) => plan
650 .desc
651 .source_imports
652 .into_iter()
653 .filter_map(|(id, (desc, _, _upper))| {
654 desc.arguments.operators.map(|mfp| (id, mfp))
655 })
656 .collect(),
657 PeekPlan::FastPath(_) => BTreeMap::default(),
658 };
659 PeekStage::ExplainPushdown(PeekStageExplainPushdown {
660 validity,
661 determination,
662 imports,
663 })
664 }
665 }
666 }
667 Ok(Either::Right(global_lir_plan)) => {
668 let optimizer = optimizer.unwrap_right();
669 PeekStage::CopyToPreflight(PeekStageCopyTo {
670 validity,
671 optimizer,
672 global_lir_plan,
673 optimization_finished_at,
674 source_ids,
675 })
676 }
677 Err(err) => {
680 let Some(optimizer) = optimizer.left() else {
681 return Err(err);
684 };
685 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
686 return Err(err);
689 };
690
691 if explain_ctx.broken {
692 tracing::error!("error while handling EXPLAIN statement: {}", err);
696 PeekStage::ExplainPlan(PeekStageExplainPlan {
697 validity,
698 optimizer,
699 df_meta: Default::default(),
700 explain_ctx,
701 insights_ctx: None,
702 })
703 } else {
704 return Err(err);
707 }
708 }
709 };
710 Ok(Box::new(stage))
711 })
712 },
713 )))
714 }
715
716 #[instrument]
717 async fn peek_real_time_recency(
718 &self,
719 session: &Session,
720 PeekStageRealTimeRecency {
721 validity,
722 plan,
723 max_query_result_size,
724 source_ids,
725 target_replica,
726 timeline_context,
727 oracle_read_ts,
728 optimizer,
729 explain_ctx,
730 }: PeekStageRealTimeRecency,
731 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
732 let fut = self
733 .determine_real_time_recent_timestamp_if_needed(session, source_ids.iter().copied())
734 .await?;
735
736 match fut {
737 Some(fut) => {
738 let span = Span::current();
739 Ok(StageResult::Handle(mz_ore::task::spawn(
740 || "peek real time recency",
741 async move {
742 let real_time_recency_ts = fut.await?;
743 let stage = PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
744 validity,
745 plan,
746 max_query_result_size,
747 target_replica,
748 timeline_context,
749 source_ids,
750 optimizer,
751 explain_ctx,
752 oracle_read_ts,
753 real_time_recency_ts: Some(real_time_recency_ts),
754 });
755 Ok(Box::new(stage))
756 }
757 .instrument(span),
758 )))
759 }
760 None => Ok(StageResult::Immediate(Box::new(
761 PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
762 validity,
763 plan,
764 max_query_result_size,
765 target_replica,
766 timeline_context,
767 source_ids,
768 optimizer,
769 explain_ctx,
770 oracle_read_ts,
771 real_time_recency_ts: None,
772 }),
773 ))),
774 }
775 }
776
777 #[instrument]
778 async fn peek_finish(
779 &mut self,
780 ctx: &mut ExecuteContext,
781 PeekStageFinish {
782 validity: _,
783 plan,
784 max_query_result_size,
785 id_bundle,
786 target_replica,
787 source_ids,
788 determination,
789 cluster_id,
790 finishing,
791 plan_insights_optimizer_trace,
792 global_lir_plan,
793 optimization_finished_at,
794 insights_ctx,
795 }: PeekStageFinish,
796 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
797 if let Some(id) = ctx.extra.contents() {
798 self.record_statement_lifecycle_event(
799 &id,
800 &StatementLifecycleEvent::OptimizationFinished,
801 optimization_finished_at,
802 );
803 }
804
805 let session = ctx.session_mut();
806 let conn_id = session.conn_id().clone();
807
808 let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
809 let source_arity = typ.arity();
810
811 emit_optimizer_notices(&*self.catalog, &*session, &df_meta.optimizer_notices);
812
813 if let Some(trace) = plan_insights_optimizer_trace {
814 let target_cluster = self.catalog().get_cluster(cluster_id);
815 let features = OptimizerFeatures::from(self.catalog().system_config())
816 .override_from(&target_cluster.config.features());
817 let insights = trace
818 .into_plan_insights(
819 &features,
820 &self.catalog().for_session(session),
821 Some(plan.finishing),
822 Some(target_cluster),
823 df_meta,
824 insights_ctx,
825 )
826 .await?;
827 session.add_notice(AdapterNotice::PlanInsights(insights));
828 }
829
830 let planned_peek = PlannedPeek {
831 plan: peek_plan,
832 determination: determination.clone(),
833 conn_id: conn_id.clone(),
834 intermediate_result_type: typ,
835 source_arity,
836 source_ids,
837 };
838
839 if let Some(transient_index_id) = match &planned_peek.plan {
840 peek::PeekPlan::FastPath(_) => None,
841 peek::PeekPlan::SlowPath(PeekDataflowPlan { id, .. }) => Some(id),
842 } {
843 if let Some(statement_logging_id) = ctx.extra.contents() {
844 self.set_transient_index_id(statement_logging_id, *transient_index_id);
845 }
846 }
847
848 if let Some(uuid) = ctx.extra().contents() {
849 let ts = determination.timestamp_context.timestamp_or_default();
850 let mut transitive_storage_deps = BTreeSet::new();
851 let mut transitive_compute_deps = BTreeSet::new();
852 for item_id in id_bundle
853 .iter()
854 .map(|gid| self.catalog.state().get_entry_by_global_id(&gid).id())
855 .flat_map(|id| self.catalog.state().transitive_uses(id))
856 {
857 let entry = self.catalog.state().get_entry(&item_id);
858 match entry.item() {
859 CatalogItem::Table(_) | CatalogItem::Source(_) => {
864 transitive_storage_deps.extend(entry.global_ids());
865 }
866 CatalogItem::MaterializedView(_) | CatalogItem::Index(_) => {
869 transitive_compute_deps.insert(entry.latest_global_id());
870 }
871 _ => {}
872 }
873 }
874 self.install_storage_watch_set(
875 conn_id.clone(),
876 transitive_storage_deps,
877 ts,
878 WatchSetResponse::StatementDependenciesReady(
879 uuid,
880 StatementLifecycleEvent::StorageDependenciesFinished,
881 ),
882 );
883 self.install_compute_watch_set(
884 conn_id,
885 transitive_compute_deps,
886 ts,
887 WatchSetResponse::StatementDependenciesReady(
888 uuid,
889 StatementLifecycleEvent::ComputeDependenciesFinished,
890 ),
891 )
892 }
893
894 let max_result_size = self.catalog().system_config().max_result_size();
895
896 let resp = self
898 .implement_peek_plan(
899 ctx.extra_mut(),
900 planned_peek,
901 finishing,
902 cluster_id,
903 target_replica,
904 max_result_size,
905 max_query_result_size,
906 )
907 .await?;
908
909 if ctx.session().vars().emit_timestamp_notice() {
910 let explanation =
911 self.explain_timestamp(ctx.session(), cluster_id, &id_bundle, determination);
912 ctx.session()
913 .add_notice(AdapterNotice::QueryTimestamp { explanation });
914 }
915
916 let resp = match plan.copy_to {
917 None => resp,
918 Some(format) => ExecuteResponse::CopyTo {
919 format,
920 resp: Box::new(resp),
921 },
922 };
923 Ok(StageResult::Response(resp))
924 }
925
926 #[instrument]
927 async fn peek_copy_to_preflight(
928 &self,
929 copy_to: PeekStageCopyTo,
930 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
931 let connection_context = self.connection_context().clone();
932 Ok(StageResult::Handle(mz_ore::task::spawn(
933 || "peek copy to preflight",
934 async {
935 let sinks = ©_to.global_lir_plan.df_desc().sink_exports;
936 if sinks.len() != 1 {
937 return Err(AdapterError::Internal(
938 "expected exactly one copy to s3 sink".into(),
939 ));
940 }
941 let (sink_id, sink_desc) = sinks
942 .first_key_value()
943 .expect("known to be exactly one copy to s3 sink");
944 match &sink_desc.connection {
945 ComputeSinkConnection::CopyToS3Oneshot(conn) => {
946 mz_storage_types::sinks::s3_oneshot_sink::preflight(
947 connection_context,
948 &conn.aws_connection,
949 &conn.upload_info,
950 conn.connection_id,
951 *sink_id,
952 )
953 .await?;
954 Ok(Box::new(PeekStage::CopyToDataflow(copy_to)))
955 }
956 _ => Err(AdapterError::Internal(
957 "expected copy to s3 oneshot sink".into(),
958 )),
959 }
960 },
961 )))
962 }
963
964 #[instrument]
965 async fn peek_copy_to_dataflow(
966 &mut self,
967 ctx: &ExecuteContext,
968 PeekStageCopyTo {
969 validity: _,
970 optimizer,
971 global_lir_plan,
972 optimization_finished_at,
973 source_ids,
974 }: PeekStageCopyTo,
975 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
976 if let Some(id) = ctx.extra.contents() {
977 self.record_statement_lifecycle_event(
978 &id,
979 &StatementLifecycleEvent::OptimizationFinished,
980 optimization_finished_at,
981 );
982 }
983
984 let sink_id = global_lir_plan.sink_id();
985 let cluster_id = optimizer.cluster_id();
986
987 let (df_desc, df_meta) = global_lir_plan.unapply();
988
989 emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
990
991 let (tx, rx) = oneshot::channel();
993 let active_copy_to = ActiveCopyTo {
994 conn_id: ctx.session().conn_id().clone(),
995 tx,
996 cluster_id,
997 depends_on: source_ids,
998 };
999 drop(
1001 self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
1002 .await,
1003 );
1004
1005 self.ship_dataflow(df_desc, cluster_id, None).await;
1007
1008 let span = Span::current();
1009 Ok(StageResult::HandleRetire(mz_ore::task::spawn(
1010 || "peek copy to dataflow",
1011 async {
1012 let res = rx.await;
1013 match res {
1014 Ok(res) => res,
1015 Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
1016 }
1017 }
1018 .instrument(span),
1019 )))
1020 }
1021
1022 #[instrument]
1023 async fn peek_explain_plan(
1024 &self,
1025 session: &Session,
1026 PeekStageExplainPlan {
1027 optimizer,
1028 insights_ctx,
1029 df_meta,
1030 explain_ctx,
1031 ..
1032 }: PeekStageExplainPlan,
1033 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1034 let rows = super::super::explain_plan_inner(
1035 session,
1036 self.catalog(),
1037 df_meta,
1038 explain_ctx,
1039 optimizer,
1040 insights_ctx,
1041 )
1042 .await?;
1043
1044 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
1045 }
1046
1047 #[instrument]
1048 async fn peek_explain_pushdown(
1049 &self,
1050 session: &Session,
1051 stage: PeekStageExplainPushdown,
1052 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1053 let as_of = stage.determination.timestamp_context.antichain();
1054 let mz_now = stage
1055 .determination
1056 .timestamp_context
1057 .timestamp()
1058 .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1059 .unwrap_or_else(ResultSpec::value_all);
1060 let fut = self
1061 .explain_pushdown_future(session, as_of, mz_now, stage.imports)
1062 .await;
1063 let span = Span::current();
1064 Ok(StageResult::HandleRetire(mz_ore::task::spawn(
1065 || "peek explain pushdown",
1066 fut.instrument(span),
1067 )))
1068 }
1069
1070 #[instrument]
1073 pub(super) fn sequence_peek_timestamp(
1074 &mut self,
1075 session: &mut Session,
1076 when: &QueryWhen,
1077 cluster_id: ClusterId,
1078 timeline_context: TimelineContext,
1079 oracle_read_ts: Option<Timestamp>,
1080 source_bundle: &CollectionIdBundle,
1081 source_ids: &BTreeSet<GlobalId>,
1082 real_time_recency_ts: Option<Timestamp>,
1083 requires_linearization: RequireLinearization,
1084 ) -> Result<TimestampDetermination<Timestamp>, AdapterError> {
1085 let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when);
1086 let timedomain_bundle;
1087
1088 let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
1091 Some(
1093 determination @ TimestampDetermination {
1094 timestamp_context: TimestampContext::TimelineTimestamp { .. },
1095 ..
1096 },
1097 ) if in_immediate_multi_stmt_txn => (determination, None),
1098 _ => {
1099 let determine_bundle = if in_immediate_multi_stmt_txn {
1100 timedomain_bundle = timedomain_for(
1103 self.catalog(),
1104 &self.index_oracle(cluster_id),
1105 source_ids,
1106 &timeline_context,
1107 session.conn_id(),
1108 cluster_id,
1109 )?;
1110
1111 &timedomain_bundle
1112 } else {
1113 source_bundle
1115 };
1116 let (determination, read_holds) = self.determine_timestamp(
1117 session,
1118 determine_bundle,
1119 when,
1120 cluster_id,
1121 &timeline_context,
1122 oracle_read_ts,
1123 real_time_recency_ts,
1124 )?;
1125 let read_holds = match determination.timestamp_context.timestamp() {
1127 Some(_ts) => Some(read_holds),
1128 None => {
1129 drop(read_holds);
1134 None
1135 }
1136 };
1137 (determination, read_holds)
1138 }
1139 };
1140
1141 if let Some(txn_reads) = self.txn_read_holds.get(session.conn_id()) {
1150 let allowed_id_bundle = txn_reads.id_bundle();
1154
1155 drop(read_holds);
1158
1159 let outside = source_bundle.difference(&allowed_id_bundle);
1160 if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
1162 let valid_names =
1163 allowed_id_bundle.resolve_names(self.catalog(), session.conn_id());
1164 let invalid_names = outside.resolve_names(self.catalog(), session.conn_id());
1165 return Err(AdapterError::RelationOutsideTimeDomain {
1166 relations: invalid_names,
1167 names: valid_names,
1168 });
1169 }
1170 } else if let Some(read_holds) = read_holds {
1171 self.store_transaction_read_holds(session.conn_id().clone(), read_holds);
1172 }
1173
1174 let mut transaction_determination = determination.clone();
1185 if when.is_transactional() {
1186 session.add_transaction_ops(TransactionOps::Peeks {
1187 determination: transaction_determination,
1188 cluster_id,
1189 requires_linearization,
1190 })?;
1191 } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
1192 transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
1194 session.add_transaction_ops(TransactionOps::Peeks {
1195 determination: transaction_determination,
1196 cluster_id,
1197 requires_linearization,
1198 })?;
1199 };
1200
1201 Ok(determination)
1202 }
1203}