1use std::collections::{BTreeMap, BTreeSet};
11use std::str::FromStr;
12use std::sync::Arc;
13
14use http::Uri;
15use itertools::Either;
16use maplit::btreemap;
17use mz_compute_types::sinks::ComputeSinkConnection;
18use mz_controller_types::ClusterId;
19use mz_expr::{CollectionPlan, ResultSpec};
20use mz_ore::cast::CastFrom;
21use mz_ore::instrument;
22use mz_repr::explain::{ExprHumanizerExt, TransientItem};
23use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
24use mz_repr::{Datum, GlobalId, RowArena, Timestamp};
25use mz_sql::ast::{ExplainStage, Statement};
26use mz_sql::catalog::CatalogCluster;
27use mz_catalog::memory::objects::CatalogItem;
29use mz_sql::plan::QueryWhen;
30use mz_sql::plan::{self, HirScalarExpr};
31use mz_sql::session::metadata::SessionMetadata;
32use mz_transform::EmptyStatisticsOracle;
33use tokio::sync::oneshot;
34use tracing::warn;
35use tracing::{Instrument, Span};
36
37use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
38use crate::command::ExecuteResponse;
39use crate::coord::id_bundle::CollectionIdBundle;
40use crate::coord::peek::{self, PeekDataflowPlan, PeekPlan, PlannedPeek};
41use crate::coord::sequencer::inner::{check_log_reads, return_if_err};
42use crate::coord::timeline::TimelineContext;
43use crate::coord::timestamp_selection::{
44 TimestampContext, TimestampDetermination, TimestampProvider,
45};
46use crate::coord::{
47 Coordinator, CopyToContext, ExecuteContext, ExplainContext, ExplainPlanContext, Message,
48 PeekStage, PeekStageCopyTo, PeekStageExplainPlan, PeekStageExplainPushdown, PeekStageFinish,
49 PeekStageLinearizeTimestamp, PeekStageOptimize, PeekStageRealTimeRecency,
50 PeekStageTimestampReadHold, PlanValidity, StageResult, Staged, TargetCluster, WatchSetResponse,
51};
52use crate::error::AdapterError;
53use crate::explain::insights::PlanInsightsContext;
54use crate::explain::optimizer_trace::OptimizerTrace;
55use crate::notice::AdapterNotice;
56use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
57use crate::optimize::{self, Optimize};
58use crate::session::{RequireLinearization, Session, TransactionOps, TransactionStatus};
59use crate::statement_logging::StatementLifecycleEvent;
60
61impl Staged for PeekStage {
62 type Ctx = ExecuteContext;
63
64 fn validity(&mut self) -> &mut PlanValidity {
65 match self {
66 PeekStage::LinearizeTimestamp(stage) => &mut stage.validity,
67 PeekStage::RealTimeRecency(stage) => &mut stage.validity,
68 PeekStage::TimestampReadHold(stage) => &mut stage.validity,
69 PeekStage::Optimize(stage) => &mut stage.validity,
70 PeekStage::Finish(stage) => &mut stage.validity,
71 PeekStage::ExplainPlan(stage) => &mut stage.validity,
72 PeekStage::ExplainPushdown(stage) => &mut stage.validity,
73 PeekStage::CopyToPreflight(stage) => &mut stage.validity,
74 PeekStage::CopyToDataflow(stage) => &mut stage.validity,
75 }
76 }
77
78 async fn stage(
79 self,
80 coord: &mut Coordinator,
81 ctx: &mut ExecuteContext,
82 ) -> Result<StageResult<Box<Self>>, AdapterError> {
83 match self {
84 PeekStage::LinearizeTimestamp(stage) => {
85 coord.peek_linearize_timestamp(ctx.session(), stage).await
86 }
87 PeekStage::RealTimeRecency(stage) => {
88 coord.peek_real_time_recency(ctx.session(), stage).await
89 }
90 PeekStage::TimestampReadHold(stage) => {
91 coord.peek_timestamp_read_hold(ctx.session_mut(), stage)
92 }
93 PeekStage::Optimize(stage) => coord.peek_optimize(ctx.session(), stage).await,
94 PeekStage::Finish(stage) => coord.peek_finish(ctx, stage).await,
95 PeekStage::ExplainPlan(stage) => coord.peek_explain_plan(ctx.session(), stage).await,
96 PeekStage::ExplainPushdown(stage) => {
97 coord.peek_explain_pushdown(ctx.session(), stage).await
98 }
99 PeekStage::CopyToPreflight(stage) => coord.peek_copy_to_preflight(stage).await,
100 PeekStage::CopyToDataflow(stage) => coord.peek_copy_to_dataflow(ctx, stage).await,
101 }
102 }
103
104 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
105 Message::PeekStageReady {
106 ctx,
107 span,
108 stage: self,
109 }
110 }
111
112 fn cancel_enabled(&self) -> bool {
113 true
114 }
115}
116
117impl Coordinator {
118 #[instrument]
125 pub(crate) async fn sequence_peek(
126 &mut self,
127 ctx: ExecuteContext,
128 plan: plan::SelectPlan,
129 target_cluster: TargetCluster,
130 max_query_result_size: Option<u64>,
131 ) {
132 let explain_ctx = if ctx.session().vars().emit_plan_insights_notice() {
133 let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
134 ExplainContext::PlanInsightsNotice(optimizer_trace)
135 } else {
136 ExplainContext::None
137 };
138
139 let stage = return_if_err!(
140 self.peek_validate(
141 ctx.session(),
142 plan,
143 target_cluster,
144 None,
145 explain_ctx,
146 max_query_result_size
147 ),
148 ctx
149 );
150 self.sequence_staged(ctx, Span::current(), stage).await;
151 }
152
153 #[instrument]
154 pub(crate) async fn sequence_copy_to(
155 &mut self,
156 ctx: ExecuteContext,
157 plan::CopyToPlan {
158 select_plan,
159 desc,
160 to,
161 connection,
162 connection_id,
163 format,
164 max_file_size,
165 }: plan::CopyToPlan,
166 target_cluster: TargetCluster,
167 ) {
168 let eval_uri = |to: HirScalarExpr| -> Result<Uri, AdapterError> {
169 let style = ExprPrepStyle::OneShot {
170 logical_time: EvalTime::NotAvailable,
171 session: ctx.session(),
172 catalog_state: self.catalog().state(),
173 };
174 let mut to = to.lower_uncorrelated()?;
175 prep_scalar_expr(&mut to, style)?;
176 let temp_storage = RowArena::new();
177 let evaled = to.eval(&[], &temp_storage)?;
178 if evaled == Datum::Null {
179 coord_bail!("COPY TO target value can not be null");
180 }
181 let to_url = match Uri::from_str(evaled.unwrap_str()) {
182 Ok(url) => {
183 if url.scheme_str() != Some("s3") {
184 coord_bail!("only 's3://...' urls are supported as COPY TO target");
185 }
186 url
187 }
188 Err(e) => coord_bail!("could not parse COPY TO target url: {}", e),
189 };
190 Ok(to_url)
191 };
192
193 let uri = return_if_err!(eval_uri(to), ctx);
194
195 let stage = return_if_err!(
196 self.peek_validate(
197 ctx.session(),
198 select_plan,
199 target_cluster,
200 Some(CopyToContext {
201 desc,
202 uri,
203 connection,
204 connection_id,
205 format,
206 max_file_size,
207 output_batch_count: None,
209 }),
210 ExplainContext::None,
211 Some(ctx.session().vars().max_query_result_size()),
212 ),
213 ctx
214 );
215 self.sequence_staged(ctx, Span::current(), stage).await;
216 }
217
218 #[instrument]
219 pub(crate) async fn explain_peek(
220 &mut self,
221 ctx: ExecuteContext,
222 plan::ExplainPlanPlan {
223 stage,
224 format,
225 config,
226 explainee,
227 }: plan::ExplainPlanPlan,
228 target_cluster: TargetCluster,
229 ) {
230 let plan::Explainee::Statement(stmt) = explainee else {
231 unreachable!()
234 };
235 let plan::ExplaineeStatement::Select { broken, plan, desc } = stmt else {
236 unreachable!()
239 };
240
241 let optimizer_trace = OptimizerTrace::new(stage.paths());
244
245 let stage = return_if_err!(
246 self.peek_validate(
247 ctx.session(),
248 plan,
249 target_cluster,
250 None,
251 ExplainContext::Plan(ExplainPlanContext {
252 broken,
253 config,
254 format,
255 stage,
256 replan: None,
257 desc: Some(desc),
258 optimizer_trace,
259 }),
260 Some(ctx.session().vars().max_query_result_size()),
261 ),
262 ctx
263 );
264 self.sequence_staged(ctx, Span::current(), stage).await;
265 }
266
267 #[instrument]
269 pub fn peek_validate(
270 &self,
271 session: &Session,
272 plan: mz_sql::plan::SelectPlan,
273 target_cluster: TargetCluster,
274 copy_to_ctx: Option<CopyToContext>,
275 explain_ctx: ExplainContext,
276 max_query_result_size: Option<u64>,
277 ) -> Result<PeekStage, AdapterError> {
278 let catalog = self.owned_catalog();
280 let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
281 let compute_instance = self
282 .instance_snapshot(cluster.id())
283 .expect("compute instance does not exist");
284 let (_, view_id) = self.allocate_transient_id();
285 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
286 .override_from(&self.catalog.get_cluster(cluster.id()).config.features())
287 .override_from(&explain_ctx);
288
289 if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
290 return Err(AdapterError::NoClusterReplicasAvailable {
291 name: cluster.name.clone(),
292 is_managed: cluster.is_managed(),
293 });
294 }
295
296 let optimizer = match copy_to_ctx {
297 None => {
298 let compute_instance = self
300 .instance_snapshot(cluster.id())
301 .expect("compute instance does not exist");
302 let (_, view_id) = self.allocate_transient_id();
303 let (_, index_id) = self.allocate_transient_id();
304
305 Either::Left(optimize::peek::Optimizer::new(
307 Arc::clone(&catalog),
308 compute_instance,
309 plan.finishing.clone(),
310 view_id,
311 index_id,
312 optimizer_config,
313 self.optimizer_metrics(),
314 ))
315 }
316 Some(mut copy_to_ctx) => {
317 let max_worker_count = match cluster
321 .replicas()
322 .map(|r| r.config.location.workers())
323 .max()
324 {
325 Some(count) => u64::cast_from(count),
326 None => {
327 return Err(AdapterError::NoClusterReplicasAvailable {
328 name: cluster.name.clone(),
329 is_managed: cluster.is_managed(),
330 });
331 }
332 };
333 copy_to_ctx.output_batch_count = Some(max_worker_count);
334 Either::Right(optimize::copy_to::Optimizer::new(
336 Arc::clone(&catalog),
337 compute_instance,
338 view_id,
339 copy_to_ctx,
340 optimizer_config,
341 self.optimizer_metrics(),
342 ))
343 }
344 };
345
346 let target_replica_name = session.vars().cluster_replica();
347 let mut target_replica = target_replica_name
348 .map(|name| {
349 cluster
350 .replica_id(name)
351 .ok_or(AdapterError::UnknownClusterReplica {
352 cluster_name: cluster.name.clone(),
353 replica_name: name.to_string(),
354 })
355 })
356 .transpose()?;
357
358 let source_ids = plan.source.depends_on();
359 let mut timeline_context = self.validate_timeline_context(source_ids.iter().copied())?;
360 if matches!(timeline_context, TimelineContext::TimestampIndependent)
361 && plan.source.contains_temporal()?
362 {
363 timeline_context = TimelineContext::TimestampDependent;
367 }
368
369 let notices = check_log_reads(
370 &catalog,
371 cluster,
372 &source_ids,
373 &mut target_replica,
374 session.vars(),
375 )?;
376 session.add_notices(notices);
377
378 let dependencies = source_ids
379 .iter()
380 .map(|id| self.catalog.resolve_item_id(id))
381 .collect();
382 let validity = PlanValidity::new(
383 catalog.transient_revision(),
384 dependencies,
385 Some(cluster.id()),
386 target_replica,
387 session.role_metadata().clone(),
388 );
389
390 Ok(PeekStage::LinearizeTimestamp(PeekStageLinearizeTimestamp {
391 validity,
392 plan,
393 max_query_result_size,
394 source_ids,
395 target_replica,
396 timeline_context,
397 optimizer,
398 explain_ctx,
399 }))
400 }
401
402 #[instrument]
404 async fn peek_linearize_timestamp(
405 &self,
406 session: &Session,
407 PeekStageLinearizeTimestamp {
408 validity,
409 source_ids,
410 plan,
411 max_query_result_size,
412 target_replica,
413 timeline_context,
414 optimizer,
415 explain_ctx,
416 }: PeekStageLinearizeTimestamp,
417 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
418 let isolation_level = session.vars().transaction_isolation().clone();
419 let timeline = Coordinator::get_timeline(&timeline_context);
420 let needs_linearized_read_ts =
421 Coordinator::needs_linearized_read_ts(&isolation_level, &plan.when);
422
423 let build_stage = move |oracle_read_ts: Option<Timestamp>| PeekStageRealTimeRecency {
424 validity,
425 plan,
426 max_query_result_size,
427 source_ids,
428 target_replica,
429 timeline_context,
430 oracle_read_ts,
431 optimizer,
432 explain_ctx,
433 };
434
435 match timeline {
436 Some(timeline) if needs_linearized_read_ts => {
437 let oracle = self.get_timestamp_oracle(&timeline);
438
439 let span = Span::current();
443 Ok(StageResult::Handle(mz_ore::task::spawn(
444 || "linearize timestamp",
445 async move {
446 let oracle_read_ts = oracle.read_ts().await;
447 let stage = build_stage(Some(oracle_read_ts));
448 let stage = PeekStage::RealTimeRecency(stage);
449 Ok(Box::new(stage))
450 }
451 .instrument(span),
452 )))
453 }
454 Some(_) | None => {
455 let stage = build_stage(None);
456 let stage = PeekStage::RealTimeRecency(stage);
457 Ok(StageResult::Immediate(Box::new(stage)))
458 }
459 }
460 }
461
462 #[instrument]
464 fn peek_timestamp_read_hold(
465 &mut self,
466 session: &mut Session,
467 PeekStageTimestampReadHold {
468 mut validity,
469 plan,
470 max_query_result_size,
471 source_ids,
472 target_replica,
473 timeline_context,
474 oracle_read_ts,
475 real_time_recency_ts,
476 optimizer,
477 explain_ctx,
478 }: PeekStageTimestampReadHold,
479 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
480 let cluster_id = match optimizer.as_ref() {
481 Either::Left(optimizer) => optimizer.cluster_id(),
482 Either::Right(optimizer) => optimizer.cluster_id(),
483 };
484 let id_bundle = self
485 .dataflow_builder(cluster_id)
486 .sufficient_collections(source_ids.iter().copied());
487
488 let item_ids = id_bundle
491 .iter()
492 .map(|id| self.catalog().resolve_item_id(&id));
493 validity.extend_dependencies(item_ids);
494
495 let determination = self.sequence_peek_timestamp(
496 session,
497 &plan.when,
498 cluster_id,
499 timeline_context,
500 oracle_read_ts,
501 &id_bundle,
502 &source_ids,
503 real_time_recency_ts,
504 (&explain_ctx).into(),
505 )?;
506
507 let stage = PeekStage::Optimize(PeekStageOptimize {
508 validity,
509 plan,
510 max_query_result_size,
511 source_ids,
512 id_bundle,
513 target_replica,
514 determination,
515 optimizer,
516 explain_ctx,
517 });
518 Ok(StageResult::Immediate(Box::new(stage)))
519 }
520
521 #[instrument]
522 async fn peek_optimize(
523 &self,
524 session: &Session,
525 PeekStageOptimize {
526 validity,
527 plan,
528 max_query_result_size,
529 source_ids,
530 id_bundle,
531 target_replica,
532 determination,
533 mut optimizer,
534 explain_ctx,
535 }: PeekStageOptimize,
536 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
537 let timestamp_context = determination.timestamp_context.clone();
540 let stats = self
541 .statistics_oracle(session, &source_ids, ×tamp_context.antichain(), true)
542 .await
543 .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
544 let session = session.meta();
545 let now = self.catalog().config().now.clone();
546 let catalog = self.owned_catalog();
547 let mut compute_instances = BTreeMap::new();
548 if explain_ctx.needs_plan_insights() {
549 for cluster in self.catalog().user_clusters() {
552 let snapshot = self.instance_snapshot(cluster.id).expect("must exist");
553 compute_instances.insert(cluster.name.clone(), snapshot);
554 }
555 }
556
557 let span = Span::current();
558 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
559 || "optimize peek",
560 move || {
561 span.in_scope(|| {
562 let pipeline = || -> Result<Either<optimize::peek::GlobalLirPlan, optimize::copy_to::GlobalLirPlan>, AdapterError> {
563 let _dispatch_guard = explain_ctx.dispatch_guard();
564
565 let raw_expr = plan.source.clone();
566
567 match optimizer.as_mut() {
568 Either::Left(optimizer) => {
570 let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
572 let local_mir_plan = local_mir_plan.resolve(timestamp_context.clone(), &session, stats);
574 let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
576
577 Ok(Either::Left(global_lir_plan))
578 }
579 Either::Right(optimizer) => {
581 let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
583 let local_mir_plan = local_mir_plan.resolve(timestamp_context.clone(), &session, stats);
585 let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
587
588 Ok(Either::Right(global_lir_plan))
589 }
590 }
591 };
592
593 let pipeline_result = pipeline();
594 let optimization_finished_at = now();
595
596 let stage = match pipeline_result {
597 Ok(Either::Left(global_lir_plan)) => {
598 let optimizer = optimizer.unwrap_left();
599 let needs_plan_insights = explain_ctx.needs_plan_insights();
601 let opt_limit = mz_adapter_types::dyncfgs::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION.get(catalog.system_config().dyncfgs());
606 let target_instance = catalog
607 .get_cluster(optimizer.cluster_id())
608 .name
609 .clone();
610 let enable_re_optimize =
611 !(matches!(explain_ctx, ExplainContext::PlanInsightsNotice(_))
612 && optimizer.duration() > opt_limit);
613 let insights_ctx = needs_plan_insights.then(|| PlanInsightsContext {
614 stmt: plan.select.as_deref().map(Clone::clone).map(Statement::Select),
615 raw_expr: plan.source.clone(),
616 catalog,
617 compute_instances,
618 target_instance,
619 metrics: optimizer.metrics().clone(),
620 finishing: optimizer.finishing().clone(),
621 optimizer_config: optimizer.config().clone(),
622 session,
623 timestamp_context,
624 view_id: optimizer.select_id(),
625 index_id: optimizer.index_id(),
626 enable_re_optimize,
627 }).map(Box::new);
628 match explain_ctx {
629 ExplainContext::Plan(explain_ctx) => {
630 let (_, df_meta, _) = global_lir_plan.unapply();
631 PeekStage::ExplainPlan(PeekStageExplainPlan {
632 validity,
633 optimizer,
634 df_meta,
635 explain_ctx,
636 insights_ctx,
637 })
638 }
639 ExplainContext::PlanInsightsNotice(optimizer_trace) => {
640 PeekStage::Finish(PeekStageFinish {
641 validity,
642 plan,
643 max_query_result_size,
644 id_bundle,
645 target_replica,
646 source_ids,
647 determination,
648 cluster_id: optimizer.cluster_id(),
649 finishing: optimizer.finishing().clone(),
650 plan_insights_optimizer_trace: Some(optimizer_trace),
651 global_lir_plan,
652 optimization_finished_at,
653 insights_ctx,
654 })
655 }
656 ExplainContext::None => PeekStage::Finish(PeekStageFinish {
657 validity,
658 plan,
659 max_query_result_size,
660 id_bundle,
661 target_replica,
662 source_ids,
663 determination,
664 cluster_id: optimizer.cluster_id(),
665 finishing: optimizer.finishing().clone(),
666 plan_insights_optimizer_trace: None,
667 global_lir_plan,
668 optimization_finished_at,
669 insights_ctx,
670 }),
671 ExplainContext::Pushdown => {
672 let (plan, _, _) = global_lir_plan.unapply();
673 let imports = match plan {
674 PeekPlan::SlowPath(plan) => plan
675 .desc
676 .source_imports
677 .into_iter()
678 .filter_map(|(id, (desc, _, _upper))| {
679 desc.arguments.operators.map(|mfp| (id, mfp))
680 })
681 .collect(),
682 PeekPlan::FastPath(_) => BTreeMap::default(),
683 };
684 PeekStage::ExplainPushdown(PeekStageExplainPushdown {
685 validity,
686 determination,
687 imports,
688 })
689 }
690 }
691 }
692 Ok(Either::Right(global_lir_plan)) => {
693 let optimizer = optimizer.unwrap_right();
694 PeekStage::CopyToPreflight(PeekStageCopyTo {
695 validity,
696 optimizer,
697 global_lir_plan,
698 optimization_finished_at,
699 source_ids,
700 })
701 }
702 Err(err) => {
705 let Some(optimizer) = optimizer.left() else {
706 return Err(err);
709 };
710 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
711 return Err(err);
714 };
715
716 if explain_ctx.broken {
717 tracing::error!("error while handling EXPLAIN statement: {}", err);
721 PeekStage::ExplainPlan(PeekStageExplainPlan {
722 validity,
723 optimizer,
724 df_meta: Default::default(),
725 explain_ctx,
726 insights_ctx: None,
727 })
728 } else {
729 return Err(err);
732 }
733 }
734 };
735 Ok(Box::new(stage))
736 })
737 },
738 )))
739 }
740
741 #[instrument]
742 async fn peek_real_time_recency(
743 &self,
744 session: &Session,
745 PeekStageRealTimeRecency {
746 validity,
747 plan,
748 max_query_result_size,
749 source_ids,
750 target_replica,
751 timeline_context,
752 oracle_read_ts,
753 optimizer,
754 explain_ctx,
755 }: PeekStageRealTimeRecency,
756 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
757 let item_ids: Vec<_> = source_ids
758 .iter()
759 .map(|gid| self.catalog.resolve_item_id(gid))
760 .collect();
761 let fut = self
762 .determine_real_time_recent_timestamp(session, item_ids.into_iter())
763 .await?;
764
765 match fut {
766 Some(fut) => {
767 let span = Span::current();
768 Ok(StageResult::Handle(mz_ore::task::spawn(
769 || "peek real time recency",
770 async move {
771 let real_time_recency_ts = fut.await?;
772 let stage = PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
773 validity,
774 plan,
775 max_query_result_size,
776 target_replica,
777 timeline_context,
778 source_ids,
779 optimizer,
780 explain_ctx,
781 oracle_read_ts,
782 real_time_recency_ts: Some(real_time_recency_ts),
783 });
784 Ok(Box::new(stage))
785 }
786 .instrument(span),
787 )))
788 }
789 None => Ok(StageResult::Immediate(Box::new(
790 PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
791 validity,
792 plan,
793 max_query_result_size,
794 target_replica,
795 timeline_context,
796 source_ids,
797 optimizer,
798 explain_ctx,
799 oracle_read_ts,
800 real_time_recency_ts: None,
801 }),
802 ))),
803 }
804 }
805
806 #[instrument]
807 async fn peek_finish(
808 &mut self,
809 ctx: &mut ExecuteContext,
810 PeekStageFinish {
811 validity: _,
812 plan,
813 max_query_result_size,
814 id_bundle,
815 target_replica,
816 source_ids,
817 determination,
818 cluster_id,
819 finishing,
820 plan_insights_optimizer_trace,
821 global_lir_plan,
822 optimization_finished_at,
823 insights_ctx,
824 }: PeekStageFinish,
825 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
826 if let Some(id) = ctx.extra.contents() {
827 self.record_statement_lifecycle_event(
828 &id,
829 &StatementLifecycleEvent::OptimizationFinished,
830 optimization_finished_at,
831 );
832 }
833
834 let session = ctx.session_mut();
835 let conn_id = session.conn_id().clone();
836
837 let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
838 let source_arity = typ.arity();
839
840 self.emit_optimizer_notices(&*session, &df_meta.optimizer_notices);
841
842 let target_cluster = self.catalog().get_cluster(cluster_id);
843
844 let features = OptimizerFeatures::from(self.catalog().system_config())
845 .override_from(&target_cluster.config.features());
846
847 if let Some(trace) = plan_insights_optimizer_trace {
848 let insights = trace
849 .into_plan_insights(
850 &features,
851 &self.catalog().for_session(session),
852 Some(plan.finishing),
853 Some(target_cluster),
854 df_meta,
855 insights_ctx,
856 )
857 .await?;
858 session.add_notice(AdapterNotice::PlanInsights(insights));
859 }
860
861 let planned_peek = PlannedPeek {
862 plan: peek_plan,
863 determination: determination.clone(),
864 conn_id: conn_id.clone(),
865 source_arity,
866 source_ids,
867 };
868
869 if let Some(transient_index_id) = match &planned_peek.plan {
870 peek::PeekPlan::FastPath(_) => None,
871 peek::PeekPlan::SlowPath(PeekDataflowPlan { id, .. }) => Some(id),
872 } {
873 if let Some(statement_logging_id) = ctx.extra.contents() {
874 self.set_transient_index_id(statement_logging_id, *transient_index_id);
875 }
876 }
877
878 if let Some(uuid) = ctx.extra().contents() {
879 let ts = determination.timestamp_context.timestamp_or_default();
880 let mut transitive_storage_deps = BTreeSet::new();
881 let mut transitive_compute_deps = BTreeSet::new();
882 for item_id in id_bundle
883 .iter()
884 .map(|gid| self.catalog.state().get_entry_by_global_id(&gid).id())
885 .flat_map(|id| self.catalog.state().transitive_uses(id))
886 {
887 let entry = self.catalog.state().get_entry(&item_id);
888 match entry.item() {
889 CatalogItem::Table(_) | CatalogItem::Source(_) => {
894 transitive_storage_deps.extend(entry.global_ids());
895 }
896 CatalogItem::MaterializedView(_) | CatalogItem::Index(_) => {
897 transitive_compute_deps.extend(entry.global_ids());
898 }
899 _ => {}
900 }
901 }
902 self.install_storage_watch_set(
903 conn_id.clone(),
904 transitive_storage_deps,
905 ts,
906 WatchSetResponse::StatementDependenciesReady(
907 uuid,
908 StatementLifecycleEvent::StorageDependenciesFinished,
909 ),
910 );
911 self.install_compute_watch_set(
912 conn_id,
913 transitive_compute_deps,
914 ts,
915 WatchSetResponse::StatementDependenciesReady(
916 uuid,
917 StatementLifecycleEvent::ComputeDependenciesFinished,
918 ),
919 )
920 }
921
922 let max_result_size = self.catalog().system_config().max_result_size();
923
924 let resp = self
926 .implement_peek_plan(
927 ctx.extra_mut(),
928 planned_peek,
929 finishing,
930 cluster_id,
931 target_replica,
932 max_result_size,
933 max_query_result_size,
934 )
935 .await?;
936
937 if ctx.session().vars().emit_timestamp_notice() {
938 let explanation =
939 self.explain_timestamp(ctx.session(), cluster_id, &id_bundle, determination);
940 ctx.session()
941 .add_notice(AdapterNotice::QueryTimestamp { explanation });
942 }
943
944 let resp = match plan.copy_to {
945 None => resp,
946 Some(format) => ExecuteResponse::CopyTo {
947 format,
948 resp: Box::new(resp),
949 },
950 };
951 Ok(StageResult::Response(resp))
952 }
953
954 #[instrument]
955 async fn peek_copy_to_preflight(
956 &mut self,
957 copy_to: PeekStageCopyTo,
958 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
959 let connection_context = self.connection_context().clone();
960 Ok(StageResult::Handle(mz_ore::task::spawn(
961 || "peek copy to preflight",
962 async {
963 let sinks = ©_to.global_lir_plan.df_desc().sink_exports;
964 if sinks.len() != 1 {
965 return Err(AdapterError::Internal(
966 "expected exactly one copy to s3 sink".into(),
967 ));
968 }
969 let (sink_id, sink_desc) = sinks
970 .first_key_value()
971 .expect("known to be exactly one copy to s3 sink");
972 match &sink_desc.connection {
973 ComputeSinkConnection::CopyToS3Oneshot(conn) => {
974 mz_storage_types::sinks::s3_oneshot_sink::preflight(
975 connection_context,
976 &conn.aws_connection,
977 &conn.upload_info,
978 conn.connection_id,
979 *sink_id,
980 )
981 .await?;
982 Ok(Box::new(PeekStage::CopyToDataflow(copy_to)))
983 }
984 _ => Err(AdapterError::Internal(
985 "expected copy to s3 oneshot sink".into(),
986 )),
987 }
988 },
989 )))
990 }
991
992 #[instrument]
993 async fn peek_copy_to_dataflow(
994 &mut self,
995 ctx: &ExecuteContext,
996 PeekStageCopyTo {
997 validity: _,
998 optimizer,
999 global_lir_plan,
1000 optimization_finished_at,
1001 source_ids,
1002 }: PeekStageCopyTo,
1003 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1004 if let Some(id) = ctx.extra.contents() {
1005 self.record_statement_lifecycle_event(
1006 &id,
1007 &StatementLifecycleEvent::OptimizationFinished,
1008 optimization_finished_at,
1009 );
1010 }
1011
1012 let sink_id = global_lir_plan.sink_id();
1013 let cluster_id = optimizer.cluster_id();
1014
1015 let (df_desc, df_meta) = global_lir_plan.unapply();
1016
1017 self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
1018
1019 let (tx, rx) = oneshot::channel();
1021 let active_copy_to = ActiveCopyTo {
1022 conn_id: ctx.session().conn_id().clone(),
1023 tx,
1024 cluster_id,
1025 depends_on: source_ids,
1026 };
1027 drop(
1029 self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
1030 .await,
1031 );
1032
1033 self.ship_dataflow(df_desc, cluster_id, None).await;
1035
1036 let span = Span::current();
1037 Ok(StageResult::HandleRetire(mz_ore::task::spawn(
1038 || "peek copy to dataflow",
1039 async {
1040 let res = rx.await;
1041 match res {
1042 Ok(res) => res,
1043 Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
1044 }
1045 }
1046 .instrument(span),
1047 )))
1048 }
1049
1050 #[instrument]
1051 async fn peek_explain_plan(
1052 &self,
1053 session: &Session,
1054 PeekStageExplainPlan {
1055 optimizer,
1056 insights_ctx,
1057 df_meta,
1058 explain_ctx:
1059 ExplainPlanContext {
1060 config,
1061 format,
1062 stage,
1063 desc,
1064 optimizer_trace,
1065 ..
1066 },
1067 ..
1068 }: PeekStageExplainPlan,
1069 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1070 let desc = desc.expect("RelationDesc for SelectPlan in EXPLAIN mode");
1071
1072 let session_catalog = self.catalog().for_session(session);
1073 let expr_humanizer = {
1074 let transient_items = btreemap! {
1075 optimizer.select_id() => TransientItem::new(
1076 Some(vec![GlobalId::Explain.to_string()]),
1077 Some(desc.iter_names().map(|c| c.to_string()).collect()),
1078 )
1079 };
1080 ExprHumanizerExt::new(transient_items, &session_catalog)
1081 };
1082
1083 let finishing = if optimizer.finishing().is_trivial(desc.arity()) {
1084 None
1085 } else {
1086 Some(optimizer.finishing().clone())
1087 };
1088
1089 let target_cluster = self.catalog().get_cluster(optimizer.cluster_id());
1090 let features = optimizer.config().features.clone();
1091
1092 let rows = optimizer_trace
1093 .into_rows(
1094 format,
1095 &config,
1096 &features,
1097 &expr_humanizer,
1098 finishing,
1099 Some(target_cluster),
1100 df_meta,
1101 stage,
1102 plan::ExplaineeStatementKind::Select,
1103 insights_ctx,
1104 )
1105 .await?;
1106
1107 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
1108 }
1109
1110 #[instrument]
1111 async fn peek_explain_pushdown(
1112 &self,
1113 session: &Session,
1114 stage: PeekStageExplainPushdown,
1115 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1116 let as_of = stage.determination.timestamp_context.antichain();
1117 let mz_now = stage
1118 .determination
1119 .timestamp_context
1120 .timestamp()
1121 .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1122 .unwrap_or_else(ResultSpec::value_all);
1123 let fut = self
1124 .render_explain_pushdown_prepare(session, as_of, mz_now, stage.imports)
1125 .await;
1126 let span = Span::current();
1127 Ok(StageResult::HandleRetire(mz_ore::task::spawn(
1128 || "peek explain pushdown",
1129 fut.instrument(span),
1130 )))
1131 }
1132
1133 #[instrument]
1136 pub(super) fn sequence_peek_timestamp(
1137 &mut self,
1138 session: &mut Session,
1139 when: &QueryWhen,
1140 cluster_id: ClusterId,
1141 timeline_context: TimelineContext,
1142 oracle_read_ts: Option<Timestamp>,
1143 source_bundle: &CollectionIdBundle,
1144 source_ids: &BTreeSet<GlobalId>,
1145 real_time_recency_ts: Option<Timestamp>,
1146 requires_linearization: RequireLinearization,
1147 ) -> Result<TimestampDetermination<Timestamp>, AdapterError> {
1148 let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when);
1149 let timedomain_bundle;
1150
1151 let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
1154 Some(
1156 determination @ TimestampDetermination {
1157 timestamp_context: TimestampContext::TimelineTimestamp { .. },
1158 ..
1159 },
1160 ) if in_immediate_multi_stmt_txn => (determination, None),
1161 _ => {
1162 let determine_bundle = if in_immediate_multi_stmt_txn {
1163 timedomain_bundle = self.timedomain_for(
1166 source_ids,
1167 &timeline_context,
1168 session.conn_id(),
1169 cluster_id,
1170 )?;
1171
1172 &timedomain_bundle
1173 } else {
1174 source_bundle
1176 };
1177 let (determination, read_holds) = self.determine_timestamp(
1178 session,
1179 determine_bundle,
1180 when,
1181 cluster_id,
1182 &timeline_context,
1183 oracle_read_ts,
1184 real_time_recency_ts,
1185 )?;
1186 let read_holds = match determination.timestamp_context.timestamp() {
1188 Some(_ts) => Some(read_holds),
1189 None => {
1190 drop(read_holds);
1195 None
1196 }
1197 };
1198 (determination, read_holds)
1199 }
1200 };
1201
1202 if let Some(txn_reads) = self.txn_read_holds.get(session.conn_id()) {
1211 let allowed_id_bundle = txn_reads.id_bundle();
1215
1216 drop(read_holds);
1219
1220 let outside = source_bundle.difference(&allowed_id_bundle);
1221 if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
1223 let valid_names =
1224 self.resolve_collection_id_bundle_names(session, &allowed_id_bundle);
1225 let invalid_names = self.resolve_collection_id_bundle_names(session, &outside);
1226 return Err(AdapterError::RelationOutsideTimeDomain {
1227 relations: invalid_names,
1228 names: valid_names,
1229 });
1230 }
1231 } else if let Some(read_holds) = read_holds {
1232 self.store_transaction_read_holds(session, read_holds);
1233 }
1234
1235 let mut transaction_determination = determination.clone();
1246 if when.is_transactional() {
1247 session.add_transaction_ops(TransactionOps::Peeks {
1248 determination: transaction_determination,
1249 cluster_id,
1250 requires_linearization,
1251 })?;
1252 } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
1253 transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
1255 session.add_transaction_ops(TransactionOps::Peeks {
1256 determination: transaction_determination,
1257 cluster_id,
1258 requires_linearization,
1259 })?;
1260 };
1261
1262 Ok(determination)
1263 }
1264}