1use std::collections::{BTreeMap, BTreeSet};
11use std::str::FromStr;
12use std::sync::Arc;
13
14use http::Uri;
15use itertools::Either;
16use maplit::btreemap;
17use mz_adapter_types::dyncfgs::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION;
18use mz_catalog::memory::objects::CatalogItem;
19use mz_compute_types::sinks::ComputeSinkConnection;
20use mz_controller_types::ClusterId;
21use mz_expr::{CollectionPlan, ResultSpec};
22use mz_ore::cast::CastFrom;
23use mz_ore::instrument;
24use mz_repr::explain::{ExprHumanizerExt, TransientItem};
25use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
26use mz_repr::{Datum, GlobalId, RowArena, Timestamp};
27use mz_sql::ast::{ExplainStage, Statement};
28use mz_sql::catalog::CatalogCluster;
29use mz_sql::plan::QueryWhen;
31use mz_sql::plan::{self, HirScalarExpr};
32use mz_sql::session::metadata::SessionMetadata;
33use mz_transform::EmptyStatisticsOracle;
34use tokio::sync::oneshot;
35use tracing::warn;
36use tracing::{Instrument, Span};
37
38use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
39use crate::command::ExecuteResponse;
40use crate::coord::id_bundle::CollectionIdBundle;
41use crate::coord::peek::{self, PeekDataflowPlan, PeekPlan, PlannedPeek};
42use crate::coord::sequencer::inner::{check_log_reads, return_if_err};
43use crate::coord::timeline::TimelineContext;
44use crate::coord::timestamp_selection::{
45 TimestampContext, TimestampDetermination, TimestampProvider,
46};
47use crate::coord::{
48 Coordinator, CopyToContext, ExecuteContext, ExplainContext, ExplainPlanContext, Message,
49 PeekStage, PeekStageCopyTo, PeekStageExplainPlan, PeekStageExplainPushdown, PeekStageFinish,
50 PeekStageLinearizeTimestamp, PeekStageOptimize, PeekStageRealTimeRecency,
51 PeekStageTimestampReadHold, PlanValidity, StageResult, Staged, TargetCluster, WatchSetResponse,
52};
53use crate::error::AdapterError;
54use crate::explain::insights::PlanInsightsContext;
55use crate::explain::optimizer_trace::OptimizerTrace;
56use crate::notice::AdapterNotice;
57use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
58use crate::optimize::{self, Optimize};
59use crate::session::{RequireLinearization, Session, TransactionOps, TransactionStatus};
60use crate::statement_logging::StatementLifecycleEvent;
61
62impl Staged for PeekStage {
63 type Ctx = ExecuteContext;
64
65 fn validity(&mut self) -> &mut PlanValidity {
66 match self {
67 PeekStage::LinearizeTimestamp(stage) => &mut stage.validity,
68 PeekStage::RealTimeRecency(stage) => &mut stage.validity,
69 PeekStage::TimestampReadHold(stage) => &mut stage.validity,
70 PeekStage::Optimize(stage) => &mut stage.validity,
71 PeekStage::Finish(stage) => &mut stage.validity,
72 PeekStage::ExplainPlan(stage) => &mut stage.validity,
73 PeekStage::ExplainPushdown(stage) => &mut stage.validity,
74 PeekStage::CopyToPreflight(stage) => &mut stage.validity,
75 PeekStage::CopyToDataflow(stage) => &mut stage.validity,
76 }
77 }
78
79 async fn stage(
80 self,
81 coord: &mut Coordinator,
82 ctx: &mut ExecuteContext,
83 ) -> Result<StageResult<Box<Self>>, AdapterError> {
84 match self {
85 PeekStage::LinearizeTimestamp(stage) => {
86 coord.peek_linearize_timestamp(ctx.session(), stage).await
87 }
88 PeekStage::RealTimeRecency(stage) => {
89 coord.peek_real_time_recency(ctx.session(), stage).await
90 }
91 PeekStage::TimestampReadHold(stage) => {
92 coord.peek_timestamp_read_hold(ctx.session_mut(), stage)
93 }
94 PeekStage::Optimize(stage) => coord.peek_optimize(ctx.session(), stage).await,
95 PeekStage::Finish(stage) => coord.peek_finish(ctx, stage).await,
96 PeekStage::ExplainPlan(stage) => coord.peek_explain_plan(ctx.session(), stage).await,
97 PeekStage::ExplainPushdown(stage) => {
98 coord.peek_explain_pushdown(ctx.session(), stage).await
99 }
100 PeekStage::CopyToPreflight(stage) => coord.peek_copy_to_preflight(stage).await,
101 PeekStage::CopyToDataflow(stage) => coord.peek_copy_to_dataflow(ctx, stage).await,
102 }
103 }
104
105 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
106 Message::PeekStageReady {
107 ctx,
108 span,
109 stage: self,
110 }
111 }
112
113 fn cancel_enabled(&self) -> bool {
114 true
115 }
116}
117
118impl Coordinator {
119 #[instrument]
126 pub(crate) async fn sequence_peek(
127 &mut self,
128 ctx: ExecuteContext,
129 plan: plan::SelectPlan,
130 target_cluster: TargetCluster,
131 max_query_result_size: Option<u64>,
132 ) {
133 let explain_ctx = if ctx.session().vars().emit_plan_insights_notice() {
134 let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
135 ExplainContext::PlanInsightsNotice(optimizer_trace)
136 } else {
137 ExplainContext::None
138 };
139
140 let stage = return_if_err!(
141 self.peek_validate(
142 ctx.session(),
143 plan,
144 target_cluster,
145 None,
146 explain_ctx,
147 max_query_result_size
148 ),
149 ctx
150 );
151 self.sequence_staged(ctx, Span::current(), stage).await;
152 }
153
154 #[instrument]
155 pub(crate) async fn sequence_copy_to(
156 &mut self,
157 ctx: ExecuteContext,
158 plan::CopyToPlan {
159 select_plan,
160 desc,
161 to,
162 connection,
163 connection_id,
164 format,
165 max_file_size,
166 }: plan::CopyToPlan,
167 target_cluster: TargetCluster,
168 ) {
169 let eval_uri = |to: HirScalarExpr| -> Result<Uri, AdapterError> {
170 let style = ExprPrepStyle::OneShot {
171 logical_time: EvalTime::NotAvailable,
172 session: ctx.session(),
173 catalog_state: self.catalog().state(),
174 };
175 let mut to = to.lower_uncorrelated()?;
176 prep_scalar_expr(&mut to, style)?;
177 let temp_storage = RowArena::new();
178 let evaled = to.eval(&[], &temp_storage)?;
179 if evaled == Datum::Null {
180 coord_bail!("COPY TO target value can not be null");
181 }
182 let to_url = match Uri::from_str(evaled.unwrap_str()) {
183 Ok(url) => {
184 if url.scheme_str() != Some("s3") {
185 coord_bail!("only 's3://...' urls are supported as COPY TO target");
186 }
187 url
188 }
189 Err(e) => coord_bail!("could not parse COPY TO target url: {}", e),
190 };
191 Ok(to_url)
192 };
193
194 let uri = return_if_err!(eval_uri(to), ctx);
195
196 let stage = return_if_err!(
197 self.peek_validate(
198 ctx.session(),
199 select_plan,
200 target_cluster,
201 Some(CopyToContext {
202 desc,
203 uri,
204 connection,
205 connection_id,
206 format,
207 max_file_size,
208 output_batch_count: None,
210 }),
211 ExplainContext::None,
212 Some(ctx.session().vars().max_query_result_size()),
213 ),
214 ctx
215 );
216 self.sequence_staged(ctx, Span::current(), stage).await;
217 }
218
219 #[instrument]
220 pub(crate) async fn explain_peek(
221 &mut self,
222 ctx: ExecuteContext,
223 plan::ExplainPlanPlan {
224 stage,
225 format,
226 config,
227 explainee,
228 }: plan::ExplainPlanPlan,
229 target_cluster: TargetCluster,
230 ) {
231 let plan::Explainee::Statement(stmt) = explainee else {
232 unreachable!()
235 };
236 let plan::ExplaineeStatement::Select { broken, plan, desc } = stmt else {
237 unreachable!()
240 };
241
242 let optimizer_trace = OptimizerTrace::new(stage.paths());
245
246 let stage = return_if_err!(
247 self.peek_validate(
248 ctx.session(),
249 plan,
250 target_cluster,
251 None,
252 ExplainContext::Plan(ExplainPlanContext {
253 broken,
254 config,
255 format,
256 stage,
257 replan: None,
258 desc: Some(desc),
259 optimizer_trace,
260 }),
261 Some(ctx.session().vars().max_query_result_size()),
262 ),
263 ctx
264 );
265 self.sequence_staged(ctx, Span::current(), stage).await;
266 }
267
268 #[instrument]
270 pub fn peek_validate(
271 &self,
272 session: &Session,
273 plan: mz_sql::plan::SelectPlan,
274 target_cluster: TargetCluster,
275 copy_to_ctx: Option<CopyToContext>,
276 explain_ctx: ExplainContext,
277 max_query_result_size: Option<u64>,
278 ) -> Result<PeekStage, AdapterError> {
279 let catalog = self.owned_catalog();
281 let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
282 let compute_instance = self
283 .instance_snapshot(cluster.id())
284 .expect("compute instance does not exist");
285 let (_, view_id) = self.allocate_transient_id();
286 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
287 .override_from(&self.catalog.get_cluster(cluster.id()).config.features())
288 .override_from(&explain_ctx);
289
290 if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
291 return Err(AdapterError::NoClusterReplicasAvailable {
292 name: cluster.name.clone(),
293 is_managed: cluster.is_managed(),
294 });
295 }
296
297 let optimizer = match copy_to_ctx {
298 None => {
299 let (_, view_id) = self.allocate_transient_id();
301 let (_, index_id) = self.allocate_transient_id();
302
303 Either::Left(optimize::peek::Optimizer::new(
305 Arc::clone(&catalog),
306 compute_instance,
307 plan.finishing.clone(),
308 view_id,
309 index_id,
310 optimizer_config,
311 self.optimizer_metrics(),
312 ))
313 }
314 Some(mut copy_to_ctx) => {
315 let worker_counts = cluster.replicas().map(|r| {
319 let loc = &r.config.location;
320 loc.workers().unwrap_or_else(|| loc.num_processes())
321 });
322 let max_worker_count = match worker_counts.max() {
323 Some(count) => u64::cast_from(count),
324 None => {
325 return Err(AdapterError::NoClusterReplicasAvailable {
326 name: cluster.name.clone(),
327 is_managed: cluster.is_managed(),
328 });
329 }
330 };
331 copy_to_ctx.output_batch_count = Some(max_worker_count);
332 Either::Right(optimize::copy_to::Optimizer::new(
334 Arc::clone(&catalog),
335 compute_instance,
336 view_id,
337 copy_to_ctx,
338 optimizer_config,
339 self.optimizer_metrics(),
340 ))
341 }
342 };
343
344 let target_replica_name = session.vars().cluster_replica();
345 let mut target_replica = target_replica_name
346 .map(|name| {
347 cluster
348 .replica_id(name)
349 .ok_or(AdapterError::UnknownClusterReplica {
350 cluster_name: cluster.name.clone(),
351 replica_name: name.to_string(),
352 })
353 })
354 .transpose()?;
355
356 let source_ids = plan.source.depends_on();
357 let mut timeline_context = self
358 .catalog()
359 .validate_timeline_context(source_ids.iter().copied())?;
360 if matches!(timeline_context, TimelineContext::TimestampIndependent)
361 && plan.source.contains_temporal()?
362 {
363 timeline_context = TimelineContext::TimestampDependent;
367 }
368
369 let notices = check_log_reads(
370 &catalog,
371 cluster,
372 &source_ids,
373 &mut target_replica,
374 session.vars(),
375 )?;
376 session.add_notices(notices);
377
378 let dependencies = source_ids
379 .iter()
380 .map(|id| self.catalog.resolve_item_id(id))
381 .collect();
382 let validity = PlanValidity::new(
383 catalog.transient_revision(),
384 dependencies,
385 Some(cluster.id()),
386 target_replica,
387 session.role_metadata().clone(),
388 );
389
390 Ok(PeekStage::LinearizeTimestamp(PeekStageLinearizeTimestamp {
391 validity,
392 plan,
393 max_query_result_size,
394 source_ids,
395 target_replica,
396 timeline_context,
397 optimizer,
398 explain_ctx,
399 }))
400 }
401
402 #[instrument]
404 async fn peek_linearize_timestamp(
405 &self,
406 session: &Session,
407 PeekStageLinearizeTimestamp {
408 validity,
409 source_ids,
410 plan,
411 max_query_result_size,
412 target_replica,
413 timeline_context,
414 optimizer,
415 explain_ctx,
416 }: PeekStageLinearizeTimestamp,
417 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
418 let isolation_level = session.vars().transaction_isolation().clone();
419 let timeline = Coordinator::get_timeline(&timeline_context);
420 let needs_linearized_read_ts =
421 Coordinator::needs_linearized_read_ts(&isolation_level, &plan.when);
422
423 let build_stage = move |oracle_read_ts: Option<Timestamp>| PeekStageRealTimeRecency {
424 validity,
425 plan,
426 max_query_result_size,
427 source_ids,
428 target_replica,
429 timeline_context,
430 oracle_read_ts,
431 optimizer,
432 explain_ctx,
433 };
434
435 match timeline {
436 Some(timeline) if needs_linearized_read_ts => {
437 let oracle = self.get_timestamp_oracle(&timeline);
438
439 let span = Span::current();
443 Ok(StageResult::Handle(mz_ore::task::spawn(
444 || "linearize timestamp",
445 async move {
446 let oracle_read_ts = oracle.read_ts().await;
447 let stage = build_stage(Some(oracle_read_ts));
448 let stage = PeekStage::RealTimeRecency(stage);
449 Ok(Box::new(stage))
450 }
451 .instrument(span),
452 )))
453 }
454 Some(_) | None => {
455 let stage = build_stage(None);
456 let stage = PeekStage::RealTimeRecency(stage);
457 Ok(StageResult::Immediate(Box::new(stage)))
458 }
459 }
460 }
461
462 #[instrument]
464 fn peek_timestamp_read_hold(
465 &mut self,
466 session: &mut Session,
467 PeekStageTimestampReadHold {
468 mut validity,
469 plan,
470 max_query_result_size,
471 source_ids,
472 target_replica,
473 timeline_context,
474 oracle_read_ts,
475 real_time_recency_ts,
476 optimizer,
477 explain_ctx,
478 }: PeekStageTimestampReadHold,
479 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
480 let cluster_id = match optimizer.as_ref() {
481 Either::Left(optimizer) => optimizer.cluster_id(),
482 Either::Right(optimizer) => optimizer.cluster_id(),
483 };
484 let id_bundle = self
485 .dataflow_builder(cluster_id)
486 .sufficient_collections(source_ids.iter().copied());
487
488 let item_ids = id_bundle
491 .iter()
492 .map(|id| self.catalog().resolve_item_id(&id));
493 validity.extend_dependencies(item_ids);
494
495 let determination = self.sequence_peek_timestamp(
496 session,
497 &plan.when,
498 cluster_id,
499 timeline_context,
500 oracle_read_ts,
501 &id_bundle,
502 &source_ids,
503 real_time_recency_ts,
504 (&explain_ctx).into(),
505 )?;
506
507 let stage = PeekStage::Optimize(PeekStageOptimize {
508 validity,
509 plan,
510 max_query_result_size,
511 source_ids,
512 id_bundle,
513 target_replica,
514 determination,
515 optimizer,
516 explain_ctx,
517 });
518 Ok(StageResult::Immediate(Box::new(stage)))
519 }
520
521 #[instrument]
522 async fn peek_optimize(
523 &self,
524 session: &Session,
525 PeekStageOptimize {
526 validity,
527 plan,
528 max_query_result_size,
529 source_ids,
530 id_bundle,
531 target_replica,
532 determination,
533 mut optimizer,
534 explain_ctx,
535 }: PeekStageOptimize,
536 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
537 let timestamp_context = determination.timestamp_context.clone();
540 let stats = self
541 .statistics_oracle(session, &source_ids, ×tamp_context.antichain(), true)
542 .await
543 .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
544 let session = session.meta();
545 let now = self.catalog().config().now.clone();
546 let catalog = self.owned_catalog();
547 let mut compute_instances = BTreeMap::new();
548 if explain_ctx.needs_plan_insights() {
549 for cluster in self.catalog().user_clusters() {
552 let snapshot = self.instance_snapshot(cluster.id).expect("must exist");
553 compute_instances.insert(cluster.name.clone(), snapshot);
554 }
555 }
556
557 let span = Span::current();
558 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
559 || "optimize peek",
560 move || {
561 span.in_scope(|| {
562 let pipeline = || -> Result<Either<optimize::peek::GlobalLirPlan, optimize::copy_to::GlobalLirPlan>, AdapterError> {
563 let _dispatch_guard = explain_ctx.dispatch_guard();
564
565 let raw_expr = plan.source.clone();
566
567 match optimizer.as_mut() {
568 Either::Left(optimizer) => {
570 let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
572 let local_mir_plan = local_mir_plan.resolve(timestamp_context.clone(), &session, stats);
574 let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
576
577 Ok(Either::Left(global_lir_plan))
578 }
579 Either::Right(optimizer) => {
581 let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
583 let local_mir_plan = local_mir_plan.resolve(timestamp_context.clone(), &session, stats);
585 let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
587
588 Ok(Either::Right(global_lir_plan))
589 }
590 }
591 };
592
593 let pipeline_result = pipeline();
594 let optimization_finished_at = now();
595
596 let stage = match pipeline_result {
597 Ok(Either::Left(global_lir_plan)) => {
598 let optimizer = optimizer.unwrap_left();
599 let needs_plan_insights = explain_ctx.needs_plan_insights();
601 let opt_limit = PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
606 .get(catalog.system_config().dyncfgs());
607 let target_instance = catalog
608 .get_cluster(optimizer.cluster_id())
609 .name
610 .clone();
611 let enable_re_optimize =
612 !(matches!(explain_ctx, ExplainContext::PlanInsightsNotice(_))
613 && optimizer.duration() > opt_limit);
614 let insights_ctx = needs_plan_insights.then(|| PlanInsightsContext {
615 stmt: plan.select.as_deref().map(Clone::clone).map(Statement::Select),
616 raw_expr: plan.source.clone(),
617 catalog,
618 compute_instances,
619 target_instance,
620 metrics: optimizer.metrics().clone(),
621 finishing: optimizer.finishing().clone(),
622 optimizer_config: optimizer.config().clone(),
623 session,
624 timestamp_context,
625 view_id: optimizer.select_id(),
626 index_id: optimizer.index_id(),
627 enable_re_optimize,
628 }).map(Box::new);
629 match explain_ctx {
630 ExplainContext::Plan(explain_ctx) => {
631 let (_, df_meta, _) = global_lir_plan.unapply();
632 PeekStage::ExplainPlan(PeekStageExplainPlan {
633 validity,
634 optimizer,
635 df_meta,
636 explain_ctx,
637 insights_ctx,
638 })
639 }
640 ExplainContext::PlanInsightsNotice(optimizer_trace) => {
641 PeekStage::Finish(PeekStageFinish {
642 validity,
643 plan,
644 max_query_result_size,
645 id_bundle,
646 target_replica,
647 source_ids,
648 determination,
649 cluster_id: optimizer.cluster_id(),
650 finishing: optimizer.finishing().clone(),
651 plan_insights_optimizer_trace: Some(optimizer_trace),
652 global_lir_plan,
653 optimization_finished_at,
654 insights_ctx,
655 })
656 }
657 ExplainContext::None => PeekStage::Finish(PeekStageFinish {
658 validity,
659 plan,
660 max_query_result_size,
661 id_bundle,
662 target_replica,
663 source_ids,
664 determination,
665 cluster_id: optimizer.cluster_id(),
666 finishing: optimizer.finishing().clone(),
667 plan_insights_optimizer_trace: None,
668 global_lir_plan,
669 optimization_finished_at,
670 insights_ctx,
671 }),
672 ExplainContext::Pushdown => {
673 let (plan, _, _) = global_lir_plan.unapply();
674 let imports = match plan {
675 PeekPlan::SlowPath(plan) => plan
676 .desc
677 .source_imports
678 .into_iter()
679 .filter_map(|(id, (desc, _, _upper))| {
680 desc.arguments.operators.map(|mfp| (id, mfp))
681 })
682 .collect(),
683 PeekPlan::FastPath(_) => BTreeMap::default(),
684 };
685 PeekStage::ExplainPushdown(PeekStageExplainPushdown {
686 validity,
687 determination,
688 imports,
689 })
690 }
691 }
692 }
693 Ok(Either::Right(global_lir_plan)) => {
694 let optimizer = optimizer.unwrap_right();
695 PeekStage::CopyToPreflight(PeekStageCopyTo {
696 validity,
697 optimizer,
698 global_lir_plan,
699 optimization_finished_at,
700 source_ids,
701 })
702 }
703 Err(err) => {
706 let Some(optimizer) = optimizer.left() else {
707 return Err(err);
710 };
711 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
712 return Err(err);
715 };
716
717 if explain_ctx.broken {
718 tracing::error!("error while handling EXPLAIN statement: {}", err);
722 PeekStage::ExplainPlan(PeekStageExplainPlan {
723 validity,
724 optimizer,
725 df_meta: Default::default(),
726 explain_ctx,
727 insights_ctx: None,
728 })
729 } else {
730 return Err(err);
733 }
734 }
735 };
736 Ok(Box::new(stage))
737 })
738 },
739 )))
740 }
741
742 #[instrument]
743 async fn peek_real_time_recency(
744 &self,
745 session: &Session,
746 PeekStageRealTimeRecency {
747 validity,
748 plan,
749 max_query_result_size,
750 source_ids,
751 target_replica,
752 timeline_context,
753 oracle_read_ts,
754 optimizer,
755 explain_ctx,
756 }: PeekStageRealTimeRecency,
757 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
758 let item_ids: Vec<_> = source_ids
759 .iter()
760 .map(|gid| self.catalog.resolve_item_id(gid))
761 .collect();
762 let fut = self
763 .determine_real_time_recent_timestamp(session, item_ids.into_iter())
764 .await?;
765
766 match fut {
767 Some(fut) => {
768 let span = Span::current();
769 Ok(StageResult::Handle(mz_ore::task::spawn(
770 || "peek real time recency",
771 async move {
772 let real_time_recency_ts = fut.await?;
773 let stage = PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
774 validity,
775 plan,
776 max_query_result_size,
777 target_replica,
778 timeline_context,
779 source_ids,
780 optimizer,
781 explain_ctx,
782 oracle_read_ts,
783 real_time_recency_ts: Some(real_time_recency_ts),
784 });
785 Ok(Box::new(stage))
786 }
787 .instrument(span),
788 )))
789 }
790 None => Ok(StageResult::Immediate(Box::new(
791 PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
792 validity,
793 plan,
794 max_query_result_size,
795 target_replica,
796 timeline_context,
797 source_ids,
798 optimizer,
799 explain_ctx,
800 oracle_read_ts,
801 real_time_recency_ts: None,
802 }),
803 ))),
804 }
805 }
806
807 #[instrument]
808 async fn peek_finish(
809 &mut self,
810 ctx: &mut ExecuteContext,
811 PeekStageFinish {
812 validity: _,
813 plan,
814 max_query_result_size,
815 id_bundle,
816 target_replica,
817 source_ids,
818 determination,
819 cluster_id,
820 finishing,
821 plan_insights_optimizer_trace,
822 global_lir_plan,
823 optimization_finished_at,
824 insights_ctx,
825 }: PeekStageFinish,
826 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
827 if let Some(id) = ctx.extra.contents() {
828 self.record_statement_lifecycle_event(
829 &id,
830 &StatementLifecycleEvent::OptimizationFinished,
831 optimization_finished_at,
832 );
833 }
834
835 let session = ctx.session_mut();
836 let conn_id = session.conn_id().clone();
837
838 let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
839 let source_arity = typ.arity();
840
841 self.emit_optimizer_notices(&*session, &df_meta.optimizer_notices);
842
843 let target_cluster = self.catalog().get_cluster(cluster_id);
844
845 let features = OptimizerFeatures::from(self.catalog().system_config())
846 .override_from(&target_cluster.config.features());
847
848 if let Some(trace) = plan_insights_optimizer_trace {
849 let insights = trace
850 .into_plan_insights(
851 &features,
852 &self.catalog().for_session(session),
853 Some(plan.finishing),
854 Some(target_cluster),
855 df_meta,
856 insights_ctx,
857 )
858 .await?;
859 session.add_notice(AdapterNotice::PlanInsights(insights));
860 }
861
862 let planned_peek = PlannedPeek {
863 plan: peek_plan,
864 determination: determination.clone(),
865 conn_id: conn_id.clone(),
866 intermediate_result_type: typ,
867 source_arity,
868 source_ids,
869 };
870
871 if let Some(transient_index_id) = match &planned_peek.plan {
872 peek::PeekPlan::FastPath(_) => None,
873 peek::PeekPlan::SlowPath(PeekDataflowPlan { id, .. }) => Some(id),
874 } {
875 if let Some(statement_logging_id) = ctx.extra.contents() {
876 self.set_transient_index_id(statement_logging_id, *transient_index_id);
877 }
878 }
879
880 if let Some(uuid) = ctx.extra().contents() {
881 let ts = determination.timestamp_context.timestamp_or_default();
882 let mut transitive_storage_deps = BTreeSet::new();
883 let mut transitive_compute_deps = BTreeSet::new();
884 for item_id in id_bundle
885 .iter()
886 .map(|gid| self.catalog.state().get_entry_by_global_id(&gid).id())
887 .flat_map(|id| self.catalog.state().transitive_uses(id))
888 {
889 let entry = self.catalog.state().get_entry(&item_id);
890 match entry.item() {
891 CatalogItem::Table(_) | CatalogItem::Source(_) => {
896 transitive_storage_deps.extend(entry.global_ids());
897 }
898 CatalogItem::MaterializedView(_) | CatalogItem::Index(_) => {
899 transitive_compute_deps.extend(entry.global_ids());
900 }
901 _ => {}
902 }
903 }
904 self.install_storage_watch_set(
905 conn_id.clone(),
906 transitive_storage_deps,
907 ts,
908 WatchSetResponse::StatementDependenciesReady(
909 uuid,
910 StatementLifecycleEvent::StorageDependenciesFinished,
911 ),
912 );
913 self.install_compute_watch_set(
914 conn_id,
915 transitive_compute_deps,
916 ts,
917 WatchSetResponse::StatementDependenciesReady(
918 uuid,
919 StatementLifecycleEvent::ComputeDependenciesFinished,
920 ),
921 )
922 }
923
924 let max_result_size = self.catalog().system_config().max_result_size();
925
926 let resp = self
928 .implement_peek_plan(
929 ctx.extra_mut(),
930 planned_peek,
931 finishing,
932 cluster_id,
933 target_replica,
934 max_result_size,
935 max_query_result_size,
936 )
937 .await?;
938
939 if ctx.session().vars().emit_timestamp_notice() {
940 let explanation =
941 self.explain_timestamp(ctx.session(), cluster_id, &id_bundle, determination);
942 ctx.session()
943 .add_notice(AdapterNotice::QueryTimestamp { explanation });
944 }
945
946 let resp = match plan.copy_to {
947 None => resp,
948 Some(format) => ExecuteResponse::CopyTo {
949 format,
950 resp: Box::new(resp),
951 },
952 };
953 Ok(StageResult::Response(resp))
954 }
955
956 #[instrument]
957 async fn peek_copy_to_preflight(
958 &mut self,
959 copy_to: PeekStageCopyTo,
960 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
961 let connection_context = self.connection_context().clone();
962 Ok(StageResult::Handle(mz_ore::task::spawn(
963 || "peek copy to preflight",
964 async {
965 let sinks = ©_to.global_lir_plan.df_desc().sink_exports;
966 if sinks.len() != 1 {
967 return Err(AdapterError::Internal(
968 "expected exactly one copy to s3 sink".into(),
969 ));
970 }
971 let (sink_id, sink_desc) = sinks
972 .first_key_value()
973 .expect("known to be exactly one copy to s3 sink");
974 match &sink_desc.connection {
975 ComputeSinkConnection::CopyToS3Oneshot(conn) => {
976 mz_storage_types::sinks::s3_oneshot_sink::preflight(
977 connection_context,
978 &conn.aws_connection,
979 &conn.upload_info,
980 conn.connection_id,
981 *sink_id,
982 )
983 .await?;
984 Ok(Box::new(PeekStage::CopyToDataflow(copy_to)))
985 }
986 _ => Err(AdapterError::Internal(
987 "expected copy to s3 oneshot sink".into(),
988 )),
989 }
990 },
991 )))
992 }
993
994 #[instrument]
995 async fn peek_copy_to_dataflow(
996 &mut self,
997 ctx: &ExecuteContext,
998 PeekStageCopyTo {
999 validity: _,
1000 optimizer,
1001 global_lir_plan,
1002 optimization_finished_at,
1003 source_ids,
1004 }: PeekStageCopyTo,
1005 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1006 if let Some(id) = ctx.extra.contents() {
1007 self.record_statement_lifecycle_event(
1008 &id,
1009 &StatementLifecycleEvent::OptimizationFinished,
1010 optimization_finished_at,
1011 );
1012 }
1013
1014 let sink_id = global_lir_plan.sink_id();
1015 let cluster_id = optimizer.cluster_id();
1016
1017 let (df_desc, df_meta) = global_lir_plan.unapply();
1018
1019 self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
1020
1021 let (tx, rx) = oneshot::channel();
1023 let active_copy_to = ActiveCopyTo {
1024 conn_id: ctx.session().conn_id().clone(),
1025 tx,
1026 cluster_id,
1027 depends_on: source_ids,
1028 };
1029 drop(
1031 self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
1032 .await,
1033 );
1034
1035 self.ship_dataflow(df_desc, cluster_id, None).await;
1037
1038 let span = Span::current();
1039 Ok(StageResult::HandleRetire(mz_ore::task::spawn(
1040 || "peek copy to dataflow",
1041 async {
1042 let res = rx.await;
1043 match res {
1044 Ok(res) => res,
1045 Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
1046 }
1047 }
1048 .instrument(span),
1049 )))
1050 }
1051
1052 #[instrument]
1053 async fn peek_explain_plan(
1054 &self,
1055 session: &Session,
1056 PeekStageExplainPlan {
1057 optimizer,
1058 insights_ctx,
1059 df_meta,
1060 explain_ctx:
1061 ExplainPlanContext {
1062 config,
1063 format,
1064 stage,
1065 desc,
1066 optimizer_trace,
1067 ..
1068 },
1069 ..
1070 }: PeekStageExplainPlan,
1071 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1072 let desc = desc.expect("RelationDesc for SelectPlan in EXPLAIN mode");
1073
1074 let session_catalog = self.catalog().for_session(session);
1075 let expr_humanizer = {
1076 let transient_items = btreemap! {
1077 optimizer.select_id() => TransientItem::new(
1078 Some(vec![GlobalId::Explain.to_string()]),
1079 Some(desc.iter_names().map(|c| c.to_string()).collect()),
1080 )
1081 };
1082 ExprHumanizerExt::new(transient_items, &session_catalog)
1083 };
1084
1085 let finishing = if optimizer.finishing().is_trivial(desc.arity()) {
1086 None
1087 } else {
1088 Some(optimizer.finishing().clone())
1089 };
1090
1091 let target_cluster = self.catalog().get_cluster(optimizer.cluster_id());
1092 let features = optimizer.config().features.clone();
1093
1094 let rows = optimizer_trace
1095 .into_rows(
1096 format,
1097 &config,
1098 &features,
1099 &expr_humanizer,
1100 finishing,
1101 Some(target_cluster),
1102 df_meta,
1103 stage,
1104 plan::ExplaineeStatementKind::Select,
1105 insights_ctx,
1106 )
1107 .await?;
1108
1109 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
1110 }
1111
1112 #[instrument]
1113 async fn peek_explain_pushdown(
1114 &self,
1115 session: &Session,
1116 stage: PeekStageExplainPushdown,
1117 ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1118 let as_of = stage.determination.timestamp_context.antichain();
1119 let mz_now = stage
1120 .determination
1121 .timestamp_context
1122 .timestamp()
1123 .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1124 .unwrap_or_else(ResultSpec::value_all);
1125 let fut = self
1126 .render_explain_pushdown_prepare(session, as_of, mz_now, stage.imports)
1127 .await;
1128 let span = Span::current();
1129 Ok(StageResult::HandleRetire(mz_ore::task::spawn(
1130 || "peek explain pushdown",
1131 fut.instrument(span),
1132 )))
1133 }
1134
1135 #[instrument]
1138 pub(super) fn sequence_peek_timestamp(
1139 &mut self,
1140 session: &mut Session,
1141 when: &QueryWhen,
1142 cluster_id: ClusterId,
1143 timeline_context: TimelineContext,
1144 oracle_read_ts: Option<Timestamp>,
1145 source_bundle: &CollectionIdBundle,
1146 source_ids: &BTreeSet<GlobalId>,
1147 real_time_recency_ts: Option<Timestamp>,
1148 requires_linearization: RequireLinearization,
1149 ) -> Result<TimestampDetermination<Timestamp>, AdapterError> {
1150 let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when);
1151 let timedomain_bundle;
1152
1153 let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
1156 Some(
1158 determination @ TimestampDetermination {
1159 timestamp_context: TimestampContext::TimelineTimestamp { .. },
1160 ..
1161 },
1162 ) if in_immediate_multi_stmt_txn => (determination, None),
1163 _ => {
1164 let determine_bundle = if in_immediate_multi_stmt_txn {
1165 timedomain_bundle = self.timedomain_for(
1168 source_ids,
1169 &timeline_context,
1170 session.conn_id(),
1171 cluster_id,
1172 )?;
1173
1174 &timedomain_bundle
1175 } else {
1176 source_bundle
1178 };
1179 let (determination, read_holds) = self.determine_timestamp(
1180 session,
1181 determine_bundle,
1182 when,
1183 cluster_id,
1184 &timeline_context,
1185 oracle_read_ts,
1186 real_time_recency_ts,
1187 )?;
1188 let read_holds = match determination.timestamp_context.timestamp() {
1190 Some(_ts) => Some(read_holds),
1191 None => {
1192 drop(read_holds);
1197 None
1198 }
1199 };
1200 (determination, read_holds)
1201 }
1202 };
1203
1204 if let Some(txn_reads) = self.txn_read_holds.get(session.conn_id()) {
1213 let allowed_id_bundle = txn_reads.id_bundle();
1217
1218 drop(read_holds);
1221
1222 let outside = source_bundle.difference(&allowed_id_bundle);
1223 if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
1225 let valid_names =
1226 self.resolve_collection_id_bundle_names(session, &allowed_id_bundle);
1227 let invalid_names = self.resolve_collection_id_bundle_names(session, &outside);
1228 return Err(AdapterError::RelationOutsideTimeDomain {
1229 relations: invalid_names,
1230 names: valid_names,
1231 });
1232 }
1233 } else if let Some(read_holds) = read_holds {
1234 self.store_transaction_read_holds(session, read_holds);
1235 }
1236
1237 let mut transaction_determination = determination.clone();
1248 if when.is_transactional() {
1249 session.add_transaction_ops(TransactionOps::Peeks {
1250 determination: transaction_determination,
1251 cluster_id,
1252 requires_linearization,
1253 })?;
1254 } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
1255 transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
1257 session.add_transaction_ops(TransactionOps::Peeks {
1258 determination: transaction_determination,
1259 cluster_id,
1260 requires_linearization,
1261 })?;
1262 };
1263
1264 Ok(determination)
1265 }
1266}