Skip to main content

mz_adapter/coord/sequencer/inner/
peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::sync::Arc;
12
13use mz_adapter_types::dyncfgs::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION;
14use mz_compute_types::sinks::ComputeSinkConnection;
15use mz_controller_types::ClusterId;
16use mz_expr::{CollectionPlan, ResultSpec};
17use mz_ore::cast::CastFrom;
18use mz_ore::instrument;
19use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
20use mz_repr::{Datum, GlobalId, Timestamp};
21use mz_sql::ast::{ExplainStage, Statement};
22use mz_sql::catalog::CatalogCluster;
23// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
24use mz_sql::plan::QueryWhen;
25use mz_sql::plan::{self};
26use mz_sql::session::metadata::SessionMetadata;
27use mz_transform::EmptyStatisticsOracle;
28use tokio::sync::oneshot;
29use tracing::warn;
30use tracing::{Instrument, Span};
31
32use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
33use crate::command::ExecuteResponse;
34use crate::coord::id_bundle::CollectionIdBundle;
35use crate::coord::peek::{self, PeekDataflowPlan, PeekPlan, PlannedPeek};
36use crate::coord::sequencer::inner::return_if_err;
37use crate::coord::sequencer::{check_log_reads, emit_optimizer_notices, eval_copy_to_uri};
38use crate::coord::timeline::{TimelineContext, timedomain_for};
39use crate::coord::timestamp_selection::{
40    TimestampContext, TimestampDetermination, TimestampProvider,
41};
42use crate::coord::{
43    Coordinator, CopyToContext, ExecuteContext, ExplainContext, ExplainPlanContext, Message,
44    PeekStage, PeekStageCopyTo, PeekStageExplainPlan, PeekStageExplainPushdown, PeekStageFinish,
45    PeekStageLinearizeTimestamp, PeekStageOptimize, PeekStageRealTimeRecency,
46    PeekStageTimestampReadHold, PlanValidity, StageResult, Staged, TargetCluster,
47};
48use crate::error::AdapterError;
49use crate::explain::insights::PlanInsightsContext;
50use crate::explain::optimizer_trace::OptimizerTrace;
51use crate::notice::AdapterNotice;
52use crate::optimize;
53use crate::session::{RequireLinearization, Session, TransactionOps, TransactionStatus};
54use crate::statement_logging::StatementLifecycleEvent;
55use crate::statement_logging::WatchSetCreation;
56
57impl Staged for PeekStage {
58    type Ctx = ExecuteContext;
59
60    fn validity(&mut self) -> &mut PlanValidity {
61        match self {
62            PeekStage::LinearizeTimestamp(stage) => &mut stage.validity,
63            PeekStage::RealTimeRecency(stage) => &mut stage.validity,
64            PeekStage::TimestampReadHold(stage) => &mut stage.validity,
65            PeekStage::Optimize(stage) => &mut stage.validity,
66            PeekStage::Finish(stage) => &mut stage.validity,
67            PeekStage::ExplainPlan(stage) => &mut stage.validity,
68            PeekStage::ExplainPushdown(stage) => &mut stage.validity,
69            PeekStage::CopyToPreflight(stage) => &mut stage.validity,
70            PeekStage::CopyToDataflow(stage) => &mut stage.validity,
71        }
72    }
73
74    async fn stage(
75        self,
76        coord: &mut Coordinator,
77        ctx: &mut ExecuteContext,
78    ) -> Result<StageResult<Box<Self>>, AdapterError> {
79        match self {
80            PeekStage::LinearizeTimestamp(stage) => {
81                coord.peek_linearize_timestamp(ctx.session(), stage).await
82            }
83            PeekStage::RealTimeRecency(stage) => {
84                coord.peek_real_time_recency(ctx.session(), stage).await
85            }
86            PeekStage::TimestampReadHold(stage) => {
87                coord.peek_timestamp_read_hold(ctx.session_mut(), stage)
88            }
89            PeekStage::Optimize(stage) => coord.peek_optimize(ctx.session(), stage).await,
90            PeekStage::Finish(stage) => coord.peek_finish(ctx, stage).await,
91            PeekStage::ExplainPlan(stage) => coord.peek_explain_plan(ctx.session(), stage).await,
92            PeekStage::ExplainPushdown(stage) => {
93                coord.peek_explain_pushdown(ctx.session(), stage).await
94            }
95            PeekStage::CopyToPreflight(stage) => coord.peek_copy_to_preflight(stage).await,
96            PeekStage::CopyToDataflow(stage) => coord.peek_copy_to_dataflow(ctx, stage).await,
97        }
98    }
99
100    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
101        Message::PeekStageReady {
102            ctx,
103            span,
104            stage: self,
105        }
106    }
107
108    fn cancel_enabled(&self) -> bool {
109        true
110    }
111}
112
113impl Coordinator {
114    /// Sequence a peek, determining a timestamp and the most efficient dataflow interaction.
115    ///
116    /// Peeks are sequenced by assigning a timestamp for evaluation, and then determining and
117    /// deploying the most efficient evaluation plan. The peek could evaluate to a constant,
118    /// be a simple read out of an existing arrangement, or required a new dataflow to build
119    /// the results to return.
120    #[instrument]
121    pub(crate) async fn sequence_peek(
122        &mut self,
123        ctx: ExecuteContext,
124        plan: plan::SelectPlan,
125        target_cluster: TargetCluster,
126        max_query_result_size: Option<u64>,
127    ) {
128        let explain_ctx = if ctx.session().vars().emit_plan_insights_notice() {
129            let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
130            ExplainContext::PlanInsightsNotice(optimizer_trace)
131        } else {
132            ExplainContext::None
133        };
134
135        let stage = return_if_err!(
136            self.peek_validate(
137                ctx.session(),
138                plan,
139                target_cluster,
140                None,
141                explain_ctx,
142                max_query_result_size
143            ),
144            ctx
145        );
146        self.sequence_staged(ctx, Span::current(), stage).await;
147    }
148
149    #[instrument]
150    pub(crate) async fn sequence_copy_to(
151        &mut self,
152        ctx: ExecuteContext,
153        plan::CopyToPlan {
154            select_plan,
155            desc,
156            to,
157            connection,
158            connection_id,
159            format,
160            max_file_size,
161        }: plan::CopyToPlan,
162        target_cluster: TargetCluster,
163    ) {
164        let uri = return_if_err!(
165            eval_copy_to_uri(to, ctx.session(), self.catalog().state()),
166            ctx
167        );
168
169        let stage = return_if_err!(
170            self.peek_validate(
171                ctx.session(),
172                select_plan,
173                target_cluster,
174                Some(CopyToContext {
175                    desc,
176                    uri,
177                    connection,
178                    connection_id,
179                    format,
180                    max_file_size,
181                    // This will be set in `peek_stage_validate` stage below.
182                    output_batch_count: None,
183                }),
184                ExplainContext::None,
185                Some(ctx.session().vars().max_query_result_size()),
186            ),
187            ctx
188        );
189        self.sequence_staged(ctx, Span::current(), stage).await;
190    }
191
192    #[instrument]
193    pub(crate) async fn explain_peek(
194        &mut self,
195        ctx: ExecuteContext,
196        plan::ExplainPlanPlan {
197            stage,
198            format,
199            config,
200            explainee,
201        }: plan::ExplainPlanPlan,
202        target_cluster: TargetCluster,
203    ) {
204        let plan::Explainee::Statement(stmt) = explainee else {
205            // This is currently asserted in the `sequence_explain_plan` code that
206            // calls this method.
207            unreachable!()
208        };
209        let plan::ExplaineeStatement::Select { broken, plan, desc } = stmt else {
210            // This is currently asserted in the `sequence_explain_plan` code that
211            // calls this method.
212            unreachable!()
213        };
214
215        // Create an OptimizerTrace instance to collect plans emitted when
216        // executing the optimizer pipeline.
217        let optimizer_trace = OptimizerTrace::new(stage.paths());
218
219        let stage = return_if_err!(
220            self.peek_validate(
221                ctx.session(),
222                plan,
223                target_cluster,
224                None,
225                ExplainContext::Plan(ExplainPlanContext {
226                    broken,
227                    config,
228                    format,
229                    stage,
230                    replan: None,
231                    desc: Some(desc),
232                    optimizer_trace,
233                }),
234                Some(ctx.session().vars().max_query_result_size()),
235            ),
236            ctx
237        );
238        self.sequence_staged(ctx, Span::current(), stage).await;
239    }
240
241    /// Do some simple validation. We must defer most of it until after any off-thread work.
242    #[instrument]
243    pub fn peek_validate(
244        &self,
245        session: &Session,
246        plan: mz_sql::plan::SelectPlan,
247        target_cluster: TargetCluster,
248        copy_to_ctx: Option<CopyToContext>,
249        explain_ctx: ExplainContext,
250        max_query_result_size: Option<u64>,
251    ) -> Result<PeekStage, AdapterError> {
252        // Collect optimizer parameters.
253        let catalog = self.owned_catalog();
254        let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
255        let compute_instance = self
256            .instance_snapshot(cluster.id())
257            .expect("compute instance does not exist");
258        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
259            .override_from(&self.catalog.get_cluster(cluster.id()).config.features())
260            .override_from(&explain_ctx);
261
262        if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
263            return Err(AdapterError::NoClusterReplicasAvailable {
264                name: cluster.name.clone(),
265                is_managed: cluster.is_managed(),
266            });
267        }
268
269        let optimizer = match copy_to_ctx {
270            None => {
271                // Collect optimizer parameters specific to the peek::Optimizer.
272                let (_, view_id) = self.allocate_transient_id();
273                let (_, index_id) = self.allocate_transient_id();
274
275                // Build an optimizer for this SELECT.
276                optimize::PeekOptimizer::Select(optimize::peek::Optimizer::new(
277                    Arc::clone(&catalog),
278                    compute_instance,
279                    plan.finishing.clone(),
280                    view_id,
281                    index_id,
282                    optimizer_config,
283                    self.optimizer_metrics(),
284                ))
285            }
286            Some(mut copy_to_ctx) => {
287                // Getting the max worker count across replicas
288                // and using that value for the number of batches to
289                // divide the copy output into.
290                let worker_counts = cluster.replicas().map(|r| {
291                    let loc = &r.config.location;
292                    loc.workers().unwrap_or_else(|| loc.num_processes())
293                });
294                let max_worker_count = match worker_counts.max() {
295                    Some(count) => u64::cast_from(count),
296                    None => {
297                        return Err(AdapterError::NoClusterReplicasAvailable {
298                            name: cluster.name.clone(),
299                            is_managed: cluster.is_managed(),
300                        });
301                    }
302                };
303                copy_to_ctx.output_batch_count = Some(max_worker_count);
304                let (_, view_id) = self.allocate_transient_id();
305                // Build an optimizer for this COPY TO.
306                optimize::PeekOptimizer::CopyTo(optimize::copy_to::Optimizer::new(
307                    Arc::clone(&catalog),
308                    compute_instance,
309                    view_id,
310                    copy_to_ctx,
311                    optimizer_config,
312                    self.optimizer_metrics(),
313                ))
314            }
315        };
316
317        let target_replica_name = session.vars().cluster_replica();
318        let mut target_replica = target_replica_name
319            .map(|name| {
320                cluster
321                    .replica_id(name)
322                    .ok_or(AdapterError::UnknownClusterReplica {
323                        cluster_name: cluster.name.clone(),
324                        replica_name: name.to_string(),
325                    })
326            })
327            .transpose()?;
328
329        let source_ids = plan.source.depends_on();
330        let mut timeline_context = self
331            .catalog()
332            .validate_timeline_context(source_ids.iter().copied())?;
333        if matches!(timeline_context, TimelineContext::TimestampIndependent)
334            && plan.source.contains_temporal()?
335        {
336            // If the source IDs are timestamp independent but the query contains temporal functions,
337            // then the timeline context needs to be upgraded to timestamp dependent. This is
338            // required because `source_ids` doesn't contain functions.
339            timeline_context = TimelineContext::TimestampDependent;
340        }
341
342        let notices = check_log_reads(
343            &catalog,
344            cluster,
345            &source_ids,
346            &mut target_replica,
347            session.vars(),
348        )?;
349        session.add_notices(notices);
350
351        let dependencies = source_ids
352            .iter()
353            .map(|id| self.catalog.resolve_item_id(id))
354            .collect();
355        let validity = PlanValidity::new(
356            catalog.transient_revision(),
357            dependencies,
358            Some(cluster.id()),
359            target_replica,
360            session.role_metadata().clone(),
361        );
362
363        Ok(PeekStage::LinearizeTimestamp(PeekStageLinearizeTimestamp {
364            validity,
365            plan,
366            max_query_result_size,
367            source_ids,
368            target_replica,
369            timeline_context,
370            optimizer,
371            explain_ctx,
372        }))
373    }
374
375    /// Possibly linearize a timestamp from a `TimestampOracle`.
376    #[instrument]
377    async fn peek_linearize_timestamp(
378        &self,
379        session: &Session,
380        PeekStageLinearizeTimestamp {
381            validity,
382            source_ids,
383            plan,
384            max_query_result_size,
385            target_replica,
386            timeline_context,
387            optimizer,
388            explain_ctx,
389        }: PeekStageLinearizeTimestamp,
390    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
391        let isolation_level = session.vars().transaction_isolation().clone();
392        let timeline = Coordinator::get_timeline(&timeline_context);
393        let needs_linearized_read_ts =
394            Coordinator::needs_linearized_read_ts(&isolation_level, &plan.when);
395
396        let build_stage = move |oracle_read_ts: Option<Timestamp>| PeekStageRealTimeRecency {
397            validity,
398            plan,
399            max_query_result_size,
400            source_ids,
401            target_replica,
402            timeline_context,
403            oracle_read_ts,
404            optimizer,
405            explain_ctx,
406        };
407
408        match timeline {
409            Some(timeline) if needs_linearized_read_ts => {
410                let oracle = self.get_timestamp_oracle(&timeline);
411
412                // We ship the timestamp oracle off to an async task, so that we
413                // don't block the main task while we wait.
414
415                let span = Span::current();
416                Ok(StageResult::Handle(mz_ore::task::spawn(
417                    || "linearize timestamp",
418                    async move {
419                        let oracle_read_ts = oracle.read_ts().await;
420                        let stage = build_stage(Some(oracle_read_ts));
421                        let stage = PeekStage::RealTimeRecency(stage);
422                        Ok(Box::new(stage))
423                    }
424                    .instrument(span),
425                )))
426            }
427            Some(_) | None => {
428                let stage = build_stage(None);
429                let stage = PeekStage::RealTimeRecency(stage);
430                Ok(StageResult::Immediate(Box::new(stage)))
431            }
432        }
433    }
434
435    /// Determine a read timestamp and create appropriate read holds.
436    #[instrument]
437    fn peek_timestamp_read_hold(
438        &mut self,
439        session: &mut Session,
440        PeekStageTimestampReadHold {
441            mut validity,
442            plan,
443            max_query_result_size,
444            source_ids,
445            target_replica,
446            timeline_context,
447            oracle_read_ts,
448            real_time_recency_ts,
449            optimizer,
450            explain_ctx,
451        }: PeekStageTimestampReadHold,
452    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
453        let cluster_id = optimizer.cluster_id();
454        let id_bundle = self
455            .dataflow_builder(cluster_id)
456            .sufficient_collections(source_ids.iter().copied());
457
458        // Although we have added `sources.depends_on()` to the validity already, also add the
459        // sufficient collections for safety.
460        let item_ids = id_bundle
461            .iter()
462            .map(|id| self.catalog().resolve_item_id(&id));
463        validity.extend_dependencies(item_ids);
464
465        let determination = self.sequence_peek_timestamp(
466            session,
467            &plan.when,
468            cluster_id,
469            timeline_context,
470            oracle_read_ts,
471            &id_bundle,
472            &source_ids,
473            real_time_recency_ts,
474            (&explain_ctx).into(),
475        )?;
476
477        let stage = PeekStage::Optimize(PeekStageOptimize {
478            validity,
479            plan,
480            max_query_result_size,
481            source_ids,
482            id_bundle,
483            target_replica,
484            determination,
485            optimizer,
486            explain_ctx,
487        });
488        Ok(StageResult::Immediate(Box::new(stage)))
489    }
490
491    #[instrument]
492    async fn peek_optimize(
493        &self,
494        session: &Session,
495        PeekStageOptimize {
496            validity,
497            plan,
498            max_query_result_size,
499            source_ids,
500            id_bundle,
501            target_replica,
502            determination,
503            mut optimizer,
504            explain_ctx,
505        }: PeekStageOptimize,
506    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
507        // Generate data structures that can be moved to another task where we will perform possibly
508        // expensive optimizations.
509        let timestamp_context = determination.timestamp_context.clone();
510        let stats = self
511            .statistics_oracle(session, &source_ids, &timestamp_context.antichain(), true)
512            .await
513            .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
514        let session = session.meta();
515        let now = self.catalog().config().now.clone();
516        let catalog = self.owned_catalog();
517        let mut compute_instances = BTreeMap::new();
518        if explain_ctx.needs_plan_insights() {
519            // There's a chance for index skew (indexes were created/deleted between stages) from the
520            // original plan, but that seems acceptable for insights.
521            for cluster in self.catalog().user_clusters() {
522                let snapshot = self.instance_snapshot(cluster.id).expect("must exist");
523                compute_instances.insert(cluster.name.clone(), snapshot);
524            }
525        }
526
527        let span = Span::current();
528        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
529            || "optimize peek",
530            move || {
531                span.in_scope(|| {
532                    // Run the optimization pipeline. The dispatch guard borrows
533                    // `explain_ctx`, so we scope it in a block to drop it before
534                    // `explain_ctx` is inspected below. A failed optimization is
535                    // captured in `pipeline_result` rather than returned, so that
536                    // `EXPLAIN BROKEN` can still be handled in the match.
537                    let pipeline_result = {
538                        let _dispatch_guard = explain_ctx.dispatch_guard();
539
540                        let raw_expr = plan.source.clone();
541
542                        optimizer
543                            .optimize(raw_expr, timestamp_context.clone(), &session, stats)
544                            .map_err(AdapterError::from)
545                    };
546
547                    let optimization_finished_at = now();
548
549                    let stage = match pipeline_result {
550                        Ok(optimize::PeekGlobalLirPlan::Select(global_lir_plan)) => {
551                            let optimizer =
552                                optimizer.into_select().expect("a SELECT/EXPLAIN optimizer");
553                            // Enable fast path cluster calculation for slow path plans.
554                            let needs_plan_insights = explain_ctx.needs_plan_insights();
555                            // Disable anything that uses the optimizer if we only want the notice and
556                            // plan optimization took longer than the threshold. This is to prevent a
557                            // situation where optimizing takes a while and there a lots of clusters,
558                            // which would delay peek execution by the product of those.
559                            let opt_limit =
560                                PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
561                                    .get(catalog.system_config().dyncfgs());
562                            let target_instance =
563                                catalog.get_cluster(optimizer.cluster_id()).name.clone();
564                            let enable_re_optimize =
565                                !(matches!(explain_ctx, ExplainContext::PlanInsightsNotice(_))
566                                    && optimizer.duration() > opt_limit);
567                            let insights_ctx = needs_plan_insights
568                                .then(|| PlanInsightsContext {
569                                    stmt: plan
570                                        .select
571                                        .as_deref()
572                                        .map(Clone::clone)
573                                        .map(Statement::Select),
574                                    raw_expr: plan.source.clone(),
575                                    catalog,
576                                    compute_instances,
577                                    target_instance,
578                                    metrics: optimizer.metrics().clone(),
579                                    finishing: optimizer.finishing().clone(),
580                                    optimizer_config: optimizer.config().clone(),
581                                    session,
582                                    timestamp_context,
583                                    view_id: optimizer.select_id(),
584                                    index_id: optimizer.index_id(),
585                                    enable_re_optimize,
586                                })
587                                .map(Box::new);
588                            match explain_ctx {
589                                ExplainContext::Plan(explain_ctx) => {
590                                    let (_, df_meta, _) = global_lir_plan.unapply();
591                                    PeekStage::ExplainPlan(PeekStageExplainPlan {
592                                        validity,
593                                        optimizer,
594                                        df_meta,
595                                        explain_ctx,
596                                        insights_ctx,
597                                    })
598                                }
599                                ExplainContext::PlanInsightsNotice(optimizer_trace) => {
600                                    PeekStage::Finish(PeekStageFinish {
601                                        validity,
602                                        plan,
603                                        max_query_result_size,
604                                        id_bundle,
605                                        target_replica,
606                                        source_ids,
607                                        determination,
608                                        cluster_id: optimizer.cluster_id(),
609                                        finishing: optimizer.finishing().clone(),
610                                        plan_insights_optimizer_trace: Some(optimizer_trace),
611                                        global_lir_plan,
612                                        optimization_finished_at,
613                                        insights_ctx,
614                                    })
615                                }
616                                ExplainContext::None => PeekStage::Finish(PeekStageFinish {
617                                    validity,
618                                    plan,
619                                    max_query_result_size,
620                                    id_bundle,
621                                    target_replica,
622                                    source_ids,
623                                    determination,
624                                    cluster_id: optimizer.cluster_id(),
625                                    finishing: optimizer.finishing().clone(),
626                                    plan_insights_optimizer_trace: None,
627                                    global_lir_plan,
628                                    optimization_finished_at,
629                                    insights_ctx,
630                                }),
631                                ExplainContext::Pushdown => {
632                                    let (plan, _, _) = global_lir_plan.unapply();
633                                    let imports = match plan {
634                                        PeekPlan::SlowPath(plan) => plan
635                                            .desc
636                                            .source_imports
637                                            .into_iter()
638                                            .filter_map(|(id, import)| {
639                                                import.desc.arguments.operators.map(|mfp| (id, mfp))
640                                            })
641                                            .collect(),
642                                        PeekPlan::FastPath(_) => BTreeMap::default(),
643                                    };
644                                    PeekStage::ExplainPushdown(PeekStageExplainPushdown {
645                                        validity,
646                                        determination,
647                                        imports,
648                                    })
649                                }
650                            }
651                        }
652                        Ok(optimize::PeekGlobalLirPlan::CopyTo(global_lir_plan)) => {
653                            let optimizer = optimizer.into_copy_to().expect("a COPY TO optimizer");
654                            PeekStage::CopyToPreflight(PeekStageCopyTo {
655                                validity,
656                                optimizer,
657                                global_lir_plan,
658                                optimization_finished_at,
659                                source_ids,
660                            })
661                        }
662                        // Internal optimizer errors are handled differently
663                        // depending on the caller.
664                        Err(err) => {
665                            let Some(optimizer) = optimizer.into_select() else {
666                                // In `COPY TO` contexts, immediately retire the
667                                // execution with the error.
668                                return Err(err);
669                            };
670                            let ExplainContext::Plan(explain_ctx) = explain_ctx else {
671                                // In `sequence_~` contexts, immediately retire the
672                                // execution with the error.
673                                return Err(err);
674                            };
675
676                            if explain_ctx.broken {
677                                // In `EXPLAIN BROKEN` contexts, just log the error
678                                // and move to the next stage with default
679                                // parameters.
680                                tracing::error!("error while handling EXPLAIN statement: {}", err);
681                                PeekStage::ExplainPlan(PeekStageExplainPlan {
682                                    validity,
683                                    optimizer,
684                                    df_meta: Default::default(),
685                                    explain_ctx,
686                                    insights_ctx: None,
687                                })
688                            } else {
689                                // In regular `EXPLAIN` contexts, immediately retire
690                                // the execution with the error.
691                                return Err(err);
692                            }
693                        }
694                    };
695                    Ok(Box::new(stage))
696                })
697            },
698        )))
699    }
700
701    #[instrument]
702    async fn peek_real_time_recency(
703        &self,
704        session: &Session,
705        PeekStageRealTimeRecency {
706            validity,
707            plan,
708            max_query_result_size,
709            source_ids,
710            target_replica,
711            timeline_context,
712            oracle_read_ts,
713            optimizer,
714            explain_ctx,
715        }: PeekStageRealTimeRecency,
716    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
717        let fut = self
718            .determine_real_time_recent_timestamp_if_needed(session, source_ids.iter().copied())
719            .await?;
720
721        match fut {
722            Some(fut) => {
723                let catalog = Arc::clone(&self.catalog);
724                let span = Span::current();
725                Ok(StageResult::Handle(mz_ore::task::spawn(
726                    || "peek real time recency",
727                    async move {
728                        let real_time_recency_ts =
729                            Coordinator::await_real_time_recent_timestamp(catalog, fut).await?;
730                        let stage = PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
731                            validity,
732                            plan,
733                            max_query_result_size,
734                            target_replica,
735                            timeline_context,
736                            source_ids,
737                            optimizer,
738                            explain_ctx,
739                            oracle_read_ts,
740                            real_time_recency_ts: Some(real_time_recency_ts),
741                        });
742                        Ok(Box::new(stage))
743                    }
744                    .instrument(span),
745                )))
746            }
747            None => Ok(StageResult::Immediate(Box::new(
748                PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
749                    validity,
750                    plan,
751                    max_query_result_size,
752                    target_replica,
753                    timeline_context,
754                    source_ids,
755                    optimizer,
756                    explain_ctx,
757                    oracle_read_ts,
758                    real_time_recency_ts: None,
759                }),
760            ))),
761        }
762    }
763
764    #[instrument]
765    async fn peek_finish(
766        &mut self,
767        ctx: &mut ExecuteContext,
768        PeekStageFinish {
769            validity: _,
770            plan,
771            max_query_result_size,
772            id_bundle,
773            target_replica,
774            source_ids,
775            determination,
776            cluster_id,
777            finishing,
778            plan_insights_optimizer_trace,
779            global_lir_plan,
780            optimization_finished_at,
781            insights_ctx,
782        }: PeekStageFinish,
783    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
784        if let Some(id) = ctx.extra.contents() {
785            self.record_statement_lifecycle_event(
786                &id,
787                &StatementLifecycleEvent::OptimizationFinished,
788                optimization_finished_at,
789            );
790        }
791
792        let session = ctx.session_mut();
793        let conn_id = session.conn_id().clone();
794
795        let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
796        let source_arity = typ.arity();
797
798        emit_optimizer_notices(&*self.catalog, &*session, &df_meta.optimizer_notices);
799
800        if let Some(trace) = plan_insights_optimizer_trace {
801            let target_cluster = self.catalog().get_cluster(cluster_id);
802            let features = OptimizerFeatures::from(self.catalog().system_config())
803                .override_from(&target_cluster.config.features());
804            let insights = trace
805                .into_plan_insights(
806                    &features,
807                    &self.catalog().for_session(session),
808                    Some(plan.finishing),
809                    Some(target_cluster),
810                    df_meta,
811                    insights_ctx,
812                )
813                .await?;
814            session.add_notice(AdapterNotice::PlanInsights(insights));
815        }
816
817        let planned_peek = PlannedPeek {
818            plan: peek_plan,
819            determination: determination.clone(),
820            conn_id: conn_id.clone(),
821            intermediate_result_type: typ,
822            source_arity,
823            source_ids,
824        };
825
826        if let Some(transient_index_id) = match &planned_peek.plan {
827            peek::PeekPlan::FastPath(_) => None,
828            peek::PeekPlan::SlowPath(PeekDataflowPlan { id, .. }) => Some(id),
829        } {
830            if let Some(statement_logging_id) = ctx.extra.contents() {
831                self.set_transient_index_id(statement_logging_id, *transient_index_id);
832            }
833        }
834
835        if let Some(logging_id) = ctx.extra().contents() {
836            let watch_set = WatchSetCreation::new(
837                logging_id,
838                self.catalog.state(),
839                &id_bundle,
840                determination.timestamp_context.timestamp_or_default(),
841            );
842            self.install_peek_watch_sets(conn_id.clone(), watch_set).expect("the old peek sequencing re-verifies the dependencies' existence before installing the new watch sets");
843        }
844
845        let max_result_size = self.catalog().system_config().max_result_size();
846
847        // Implement the peek, and capture the response.
848        let resp = self
849            .implement_peek_plan(
850                ctx.extra_mut(),
851                planned_peek,
852                finishing,
853                cluster_id,
854                target_replica,
855                max_result_size,
856                max_query_result_size,
857            )
858            .await?;
859
860        if ctx.session().vars().emit_timestamp_notice() {
861            let explanation = self.explain_timestamp(
862                ctx.session().conn_id(),
863                ctx.session().pcx().wall_time,
864                cluster_id,
865                &id_bundle,
866                determination,
867            );
868            ctx.session()
869                .add_notice(AdapterNotice::QueryTimestamp { explanation });
870        }
871
872        let resp = match plan.copy_to {
873            None => resp,
874            Some(format) => ExecuteResponse::CopyTo {
875                format,
876                resp: Box::new(resp),
877            },
878        };
879        Ok(StageResult::Response(resp))
880    }
881
882    #[instrument]
883    async fn peek_copy_to_preflight(
884        &self,
885        copy_to: PeekStageCopyTo,
886    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
887        let connection_context = self.connection_context().clone();
888        let enforce_external_addresses = mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
889            .get(self.controller.storage.config().config_set());
890        Ok(StageResult::Handle(mz_ore::task::spawn(
891            || "peek copy to preflight",
892            async move {
893                let sinks = &copy_to.global_lir_plan.df_desc().sink_exports;
894                if sinks.len() != 1 {
895                    return Err(AdapterError::Internal(
896                        "expected exactly one copy to s3 sink".into(),
897                    ));
898                }
899                let (sink_id, sink_desc) = sinks
900                    .first_key_value()
901                    .expect("known to be exactly one copy to s3 sink");
902                match &sink_desc.connection {
903                    ComputeSinkConnection::CopyToS3Oneshot(conn) => {
904                        mz_storage_types::sinks::s3_oneshot_sink::preflight(
905                            connection_context,
906                            &conn.aws_connection,
907                            &conn.upload_info,
908                            conn.connection_id,
909                            *sink_id,
910                            enforce_external_addresses,
911                        )
912                        .await?;
913                        Ok(Box::new(PeekStage::CopyToDataflow(copy_to)))
914                    }
915                    _ => Err(AdapterError::Internal(
916                        "expected copy to s3 oneshot sink".into(),
917                    )),
918                }
919            },
920        )))
921    }
922
923    #[instrument]
924    async fn peek_copy_to_dataflow(
925        &mut self,
926        ctx: &ExecuteContext,
927        PeekStageCopyTo {
928            validity: _,
929            optimizer,
930            global_lir_plan,
931            optimization_finished_at,
932            source_ids,
933        }: PeekStageCopyTo,
934    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
935        if let Some(id) = ctx.extra.contents() {
936            self.record_statement_lifecycle_event(
937                &id,
938                &StatementLifecycleEvent::OptimizationFinished,
939                optimization_finished_at,
940            );
941        }
942
943        let sink_id = global_lir_plan.sink_id();
944        let cluster_id = optimizer.cluster_id();
945
946        let (df_desc, df_meta) = global_lir_plan.unapply();
947
948        emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
949
950        // Callback for the active copy to.
951        let (tx, rx) = oneshot::channel();
952        let active_copy_to = ActiveCopyTo {
953            conn_id: ctx.session().conn_id().clone(),
954            tx,
955            cluster_id,
956            depends_on: source_ids,
957        };
958        // Add metadata for the new COPY TO. CopyTo returns a `ready` future, so it is safe to drop.
959        drop(
960            self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
961                .await,
962        );
963
964        // Ship dataflow.
965        self.ship_dataflow(df_desc, cluster_id, None).await;
966
967        let span = Span::current();
968        Ok(StageResult::HandleRetire(mz_ore::task::spawn(
969            || "peek copy to dataflow",
970            async {
971                let res = rx.await;
972                match res {
973                    Ok(res) => res,
974                    Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
975                }
976            }
977            .instrument(span),
978        )))
979    }
980
981    #[instrument]
982    async fn peek_explain_plan(
983        &self,
984        session: &Session,
985        PeekStageExplainPlan {
986            optimizer,
987            insights_ctx,
988            df_meta,
989            explain_ctx,
990            ..
991        }: PeekStageExplainPlan,
992    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
993        let rows = super::super::explain_plan_inner(
994            session,
995            self.catalog(),
996            df_meta,
997            explain_ctx,
998            optimizer,
999            insights_ctx,
1000        )
1001        .await?;
1002
1003        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
1004    }
1005
1006    #[instrument]
1007    async fn peek_explain_pushdown(
1008        &self,
1009        session: &Session,
1010        stage: PeekStageExplainPushdown,
1011    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1012        let as_of = stage.determination.timestamp_context.antichain();
1013        let mz_now = stage
1014            .determination
1015            .timestamp_context
1016            .timestamp()
1017            .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1018            .unwrap_or_else(ResultSpec::value_all);
1019        let fut = self
1020            .explain_pushdown_future(session, as_of, mz_now, stage.imports)
1021            .await;
1022        let span = Span::current();
1023        Ok(StageResult::HandleRetire(mz_ore::task::spawn(
1024            || "peek explain pushdown",
1025            fut.instrument(span),
1026        )))
1027    }
1028
1029    /// Determines the query timestamp and acquires read holds on dependent sources
1030    /// if necessary.
1031    #[instrument]
1032    pub(super) fn sequence_peek_timestamp(
1033        &mut self,
1034        session: &mut Session,
1035        when: &QueryWhen,
1036        cluster_id: ClusterId,
1037        timeline_context: TimelineContext,
1038        oracle_read_ts: Option<Timestamp>,
1039        source_bundle: &CollectionIdBundle,
1040        source_ids: &BTreeSet<GlobalId>,
1041        real_time_recency_ts: Option<Timestamp>,
1042        requires_linearization: RequireLinearization,
1043    ) -> Result<TimestampDetermination, AdapterError> {
1044        let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when);
1045        let timedomain_bundle;
1046
1047        // Fetch or generate a timestamp for this query and what the read holds would be if we need to set
1048        // them.
1049        let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
1050            // Use the transaction's timestamp if it exists and this isn't an AS OF query.
1051            Some(
1052                determination @ TimestampDetermination {
1053                    timestamp_context: TimestampContext::TimelineTimestamp { .. },
1054                    ..
1055                },
1056            ) if in_immediate_multi_stmt_txn => (determination, None),
1057            _ => {
1058                let determine_bundle = if in_immediate_multi_stmt_txn {
1059                    // In a transaction, determine a timestamp that will be valid for anything in
1060                    // any schema referenced by the first query.
1061                    timedomain_bundle = timedomain_for(
1062                        self.catalog(),
1063                        &self.index_oracle(cluster_id),
1064                        source_ids,
1065                        &timeline_context,
1066                        session.conn_id(),
1067                        cluster_id,
1068                    )?;
1069
1070                    &timedomain_bundle
1071                } else {
1072                    // If not in a transaction, use the source.
1073                    source_bundle
1074                };
1075                let (determination, read_holds) = self.determine_timestamp(
1076                    session,
1077                    determine_bundle,
1078                    when,
1079                    cluster_id,
1080                    &timeline_context,
1081                    oracle_read_ts,
1082                    real_time_recency_ts,
1083                )?;
1084                // We only need read holds if the read depends on a timestamp.
1085                let read_holds = match determination.timestamp_context.timestamp() {
1086                    Some(_ts) => Some(read_holds),
1087                    None => {
1088                        // We don't need the read holds and shouldn't add them
1089                        // to the txn.
1090                        //
1091                        // TODO: Handle this within determine_timestamp.
1092                        drop(read_holds);
1093                        None
1094                    }
1095                };
1096                (determination, read_holds)
1097            }
1098        };
1099
1100        // Always either verify the current statement ids are within the existing
1101        // transaction's read hold set (timedomain), or create the read holds if this is the
1102        // first statement in a transaction (or this is a single statement transaction).
1103        // This must happen even if this is an `AS OF` query as well. There are steps after
1104        // this that happen off thread, so no matter the kind of statement or transaction,
1105        // we must acquire read holds here so they are held until the off-thread work
1106        // returns to the coordinator.
1107
1108        if let Some(txn_reads) = self.txn_read_holds.get(session.conn_id()) {
1109            // Find referenced ids not in the read hold. A reference could be caused by a
1110            // user specifying an object in a different schema than the first query. An
1111            // index could be caused by a CREATE INDEX after the transaction started.
1112            let allowed_id_bundle = txn_reads.id_bundle();
1113
1114            // We don't need the read holds that determine_timestamp acquired
1115            // for us.
1116            drop(read_holds);
1117
1118            let outside = source_bundle.difference(&allowed_id_bundle);
1119            // Queries without a timestamp and timeline can belong to any existing timedomain.
1120            if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
1121                let valid_names =
1122                    allowed_id_bundle.resolve_names(self.catalog(), session.conn_id());
1123                let invalid_names = outside.resolve_names(self.catalog(), session.conn_id());
1124                return Err(AdapterError::RelationOutsideTimeDomain {
1125                    relations: invalid_names,
1126                    names: valid_names,
1127                });
1128            }
1129        } else if let Some(read_holds) = read_holds {
1130            self.store_transaction_read_holds(session.conn_id().clone(), read_holds);
1131        }
1132
1133        // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
1134        // arbitrary and we don't recall why we did it (possibly an error!). Change this to always
1135        // set the transaction ops. Decide and document what our policy should be on AS OF queries.
1136        // Maybe they shouldn't be allowed in transactions at all because it's hard to explain
1137        // what's going on there. This should probably get a small design document.
1138
1139        // We only track the peeks in the session if the query doesn't use AS
1140        // OF or we're inside an explicit transaction. The latter case is
1141        // necessary to support PG's `BEGIN` semantics, whose behavior can
1142        // depend on whether or not reads have occurred in the txn.
1143        let mut transaction_determination = determination.clone();
1144        if when.is_transactional() {
1145            session.add_transaction_ops(TransactionOps::Peeks {
1146                determination: transaction_determination,
1147                cluster_id,
1148                requires_linearization,
1149            })?;
1150        } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
1151            // If the query uses AS OF, then ignore the timestamp.
1152            transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
1153            session.add_transaction_ops(TransactionOps::Peeks {
1154                determination: transaction_determination,
1155                cluster_id,
1156                requires_linearization,
1157            })?;
1158        };
1159
1160        Ok(determination)
1161    }
1162}