Skip to main content

mz_adapter/coord/sequencer/inner/
peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::sync::Arc;
12
13use itertools::Either;
14use mz_adapter_types::dyncfgs::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION;
15use mz_compute_types::sinks::ComputeSinkConnection;
16use mz_controller_types::ClusterId;
17use mz_expr::{CollectionPlan, ResultSpec};
18use mz_ore::cast::CastFrom;
19use mz_ore::instrument;
20use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
21use mz_repr::{Datum, GlobalId, Timestamp};
22use mz_sql::ast::{ExplainStage, Statement};
23use mz_sql::catalog::CatalogCluster;
24// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
25use mz_sql::plan::QueryWhen;
26use mz_sql::plan::{self};
27use mz_sql::session::metadata::SessionMetadata;
28use mz_transform::EmptyStatisticsOracle;
29use tokio::sync::oneshot;
30use tracing::warn;
31use tracing::{Instrument, Span};
32
33use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
34use crate::command::ExecuteResponse;
35use crate::coord::id_bundle::CollectionIdBundle;
36use crate::coord::peek::{self, PeekDataflowPlan, PeekPlan, PlannedPeek};
37use crate::coord::sequencer::inner::return_if_err;
38use crate::coord::sequencer::{check_log_reads, emit_optimizer_notices, eval_copy_to_uri};
39use crate::coord::timeline::{TimelineContext, timedomain_for};
40use crate::coord::timestamp_selection::{
41    TimestampContext, TimestampDetermination, TimestampProvider,
42};
43use crate::coord::{
44    Coordinator, CopyToContext, ExecuteContext, ExplainContext, ExplainPlanContext, Message,
45    PeekStage, PeekStageCopyTo, PeekStageExplainPlan, PeekStageExplainPushdown, PeekStageFinish,
46    PeekStageLinearizeTimestamp, PeekStageOptimize, PeekStageRealTimeRecency,
47    PeekStageTimestampReadHold, PlanValidity, StageResult, Staged, TargetCluster,
48};
49use crate::error::AdapterError;
50use crate::explain::insights::PlanInsightsContext;
51use crate::explain::optimizer_trace::OptimizerTrace;
52use crate::notice::AdapterNotice;
53use crate::optimize::{self, Optimize};
54use crate::session::{RequireLinearization, Session, TransactionOps, TransactionStatus};
55use crate::statement_logging::StatementLifecycleEvent;
56use crate::statement_logging::WatchSetCreation;
57
58impl Staged for PeekStage {
59    type Ctx = ExecuteContext;
60
61    fn validity(&mut self) -> &mut PlanValidity {
62        match self {
63            PeekStage::LinearizeTimestamp(stage) => &mut stage.validity,
64            PeekStage::RealTimeRecency(stage) => &mut stage.validity,
65            PeekStage::TimestampReadHold(stage) => &mut stage.validity,
66            PeekStage::Optimize(stage) => &mut stage.validity,
67            PeekStage::Finish(stage) => &mut stage.validity,
68            PeekStage::ExplainPlan(stage) => &mut stage.validity,
69            PeekStage::ExplainPushdown(stage) => &mut stage.validity,
70            PeekStage::CopyToPreflight(stage) => &mut stage.validity,
71            PeekStage::CopyToDataflow(stage) => &mut stage.validity,
72        }
73    }
74
75    async fn stage(
76        self,
77        coord: &mut Coordinator,
78        ctx: &mut ExecuteContext,
79    ) -> Result<StageResult<Box<Self>>, AdapterError> {
80        match self {
81            PeekStage::LinearizeTimestamp(stage) => {
82                coord.peek_linearize_timestamp(ctx.session(), stage).await
83            }
84            PeekStage::RealTimeRecency(stage) => {
85                coord.peek_real_time_recency(ctx.session(), stage).await
86            }
87            PeekStage::TimestampReadHold(stage) => {
88                coord.peek_timestamp_read_hold(ctx.session_mut(), stage)
89            }
90            PeekStage::Optimize(stage) => coord.peek_optimize(ctx.session(), stage).await,
91            PeekStage::Finish(stage) => coord.peek_finish(ctx, stage).await,
92            PeekStage::ExplainPlan(stage) => coord.peek_explain_plan(ctx.session(), stage).await,
93            PeekStage::ExplainPushdown(stage) => {
94                coord.peek_explain_pushdown(ctx.session(), stage).await
95            }
96            PeekStage::CopyToPreflight(stage) => coord.peek_copy_to_preflight(stage).await,
97            PeekStage::CopyToDataflow(stage) => coord.peek_copy_to_dataflow(ctx, stage).await,
98        }
99    }
100
101    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
102        Message::PeekStageReady {
103            ctx,
104            span,
105            stage: self,
106        }
107    }
108
109    fn cancel_enabled(&self) -> bool {
110        true
111    }
112}
113
114impl Coordinator {
115    /// Sequence a peek, determining a timestamp and the most efficient dataflow interaction.
116    ///
117    /// Peeks are sequenced by assigning a timestamp for evaluation, and then determining and
118    /// deploying the most efficient evaluation plan. The peek could evaluate to a constant,
119    /// be a simple read out of an existing arrangement, or required a new dataflow to build
120    /// the results to return.
121    #[instrument]
122    pub(crate) async fn sequence_peek(
123        &mut self,
124        ctx: ExecuteContext,
125        plan: plan::SelectPlan,
126        target_cluster: TargetCluster,
127        max_query_result_size: Option<u64>,
128    ) {
129        let explain_ctx = if ctx.session().vars().emit_plan_insights_notice() {
130            let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
131            ExplainContext::PlanInsightsNotice(optimizer_trace)
132        } else {
133            ExplainContext::None
134        };
135
136        let stage = return_if_err!(
137            self.peek_validate(
138                ctx.session(),
139                plan,
140                target_cluster,
141                None,
142                explain_ctx,
143                max_query_result_size
144            ),
145            ctx
146        );
147        self.sequence_staged(ctx, Span::current(), stage).await;
148    }
149
150    #[instrument]
151    pub(crate) async fn sequence_copy_to(
152        &mut self,
153        ctx: ExecuteContext,
154        plan::CopyToPlan {
155            select_plan,
156            desc,
157            to,
158            connection,
159            connection_id,
160            format,
161            max_file_size,
162        }: plan::CopyToPlan,
163        target_cluster: TargetCluster,
164    ) {
165        let uri = return_if_err!(
166            eval_copy_to_uri(to, ctx.session(), self.catalog().state()),
167            ctx
168        );
169
170        let stage = return_if_err!(
171            self.peek_validate(
172                ctx.session(),
173                select_plan,
174                target_cluster,
175                Some(CopyToContext {
176                    desc,
177                    uri,
178                    connection,
179                    connection_id,
180                    format,
181                    max_file_size,
182                    // This will be set in `peek_stage_validate` stage below.
183                    output_batch_count: None,
184                }),
185                ExplainContext::None,
186                Some(ctx.session().vars().max_query_result_size()),
187            ),
188            ctx
189        );
190        self.sequence_staged(ctx, Span::current(), stage).await;
191    }
192
193    #[instrument]
194    pub(crate) async fn explain_peek(
195        &mut self,
196        ctx: ExecuteContext,
197        plan::ExplainPlanPlan {
198            stage,
199            format,
200            config,
201            explainee,
202        }: plan::ExplainPlanPlan,
203        target_cluster: TargetCluster,
204    ) {
205        let plan::Explainee::Statement(stmt) = explainee else {
206            // This is currently asserted in the `sequence_explain_plan` code that
207            // calls this method.
208            unreachable!()
209        };
210        let plan::ExplaineeStatement::Select { broken, plan, desc } = stmt else {
211            // This is currently asserted in the `sequence_explain_plan` code that
212            // calls this method.
213            unreachable!()
214        };
215
216        // Create an OptimizerTrace instance to collect plans emitted when
217        // executing the optimizer pipeline.
218        let optimizer_trace = OptimizerTrace::new(stage.paths());
219
220        let stage = return_if_err!(
221            self.peek_validate(
222                ctx.session(),
223                plan,
224                target_cluster,
225                None,
226                ExplainContext::Plan(ExplainPlanContext {
227                    broken,
228                    config,
229                    format,
230                    stage,
231                    replan: None,
232                    desc: Some(desc),
233                    optimizer_trace,
234                }),
235                Some(ctx.session().vars().max_query_result_size()),
236            ),
237            ctx
238        );
239        self.sequence_staged(ctx, Span::current(), stage).await;
240    }
241
242    /// Do some simple validation. We must defer most of it until after any off-thread work.
243    #[instrument]
244    pub fn peek_validate(
245        &self,
246        session: &Session,
247        plan: mz_sql::plan::SelectPlan,
248        target_cluster: TargetCluster,
249        copy_to_ctx: Option<CopyToContext>,
250        explain_ctx: ExplainContext,
251        max_query_result_size: Option<u64>,
252    ) -> Result<PeekStage, AdapterError> {
253        // Collect optimizer parameters.
254        let catalog = self.owned_catalog();
255        let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
256        let compute_instance = self
257            .instance_snapshot(cluster.id())
258            .expect("compute instance does not exist");
259        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
260            .override_from(&self.catalog.get_cluster(cluster.id()).config.features())
261            .override_from(&explain_ctx);
262
263        if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
264            return Err(AdapterError::NoClusterReplicasAvailable {
265                name: cluster.name.clone(),
266                is_managed: cluster.is_managed(),
267            });
268        }
269
270        let optimizer = match copy_to_ctx {
271            None => {
272                // Collect optimizer parameters specific to the peek::Optimizer.
273                let (_, view_id) = self.allocate_transient_id();
274                let (_, index_id) = self.allocate_transient_id();
275
276                // Build an optimizer for this SELECT.
277                Either::Left(optimize::peek::Optimizer::new(
278                    Arc::clone(&catalog),
279                    compute_instance,
280                    plan.finishing.clone(),
281                    view_id,
282                    index_id,
283                    optimizer_config,
284                    self.optimizer_metrics(),
285                ))
286            }
287            Some(mut copy_to_ctx) => {
288                // Getting the max worker count across replicas
289                // and using that value for the number of batches to
290                // divide the copy output into.
291                let worker_counts = cluster.replicas().map(|r| {
292                    let loc = &r.config.location;
293                    loc.workers().unwrap_or_else(|| loc.num_processes())
294                });
295                let max_worker_count = match worker_counts.max() {
296                    Some(count) => u64::cast_from(count),
297                    None => {
298                        return Err(AdapterError::NoClusterReplicasAvailable {
299                            name: cluster.name.clone(),
300                            is_managed: cluster.is_managed(),
301                        });
302                    }
303                };
304                copy_to_ctx.output_batch_count = Some(max_worker_count);
305                let (_, view_id) = self.allocate_transient_id();
306                // Build an optimizer for this COPY TO.
307                Either::Right(optimize::copy_to::Optimizer::new(
308                    Arc::clone(&catalog),
309                    compute_instance,
310                    view_id,
311                    copy_to_ctx,
312                    optimizer_config,
313                    self.optimizer_metrics(),
314                ))
315            }
316        };
317
318        let target_replica_name = session.vars().cluster_replica();
319        let mut target_replica = target_replica_name
320            .map(|name| {
321                cluster
322                    .replica_id(name)
323                    .ok_or(AdapterError::UnknownClusterReplica {
324                        cluster_name: cluster.name.clone(),
325                        replica_name: name.to_string(),
326                    })
327            })
328            .transpose()?;
329
330        let source_ids = plan.source.depends_on();
331        let mut timeline_context = self
332            .catalog()
333            .validate_timeline_context(source_ids.iter().copied())?;
334        if matches!(timeline_context, TimelineContext::TimestampIndependent)
335            && plan.source.contains_temporal()?
336        {
337            // If the source IDs are timestamp independent but the query contains temporal functions,
338            // then the timeline context needs to be upgraded to timestamp dependent. This is
339            // required because `source_ids` doesn't contain functions.
340            timeline_context = TimelineContext::TimestampDependent;
341        }
342
343        let notices = check_log_reads(
344            &catalog,
345            cluster,
346            &source_ids,
347            &mut target_replica,
348            session.vars(),
349        )?;
350        session.add_notices(notices);
351
352        let dependencies = source_ids
353            .iter()
354            .map(|id| self.catalog.resolve_item_id(id))
355            .collect();
356        let validity = PlanValidity::new(
357            catalog.transient_revision(),
358            dependencies,
359            Some(cluster.id()),
360            target_replica,
361            session.role_metadata().clone(),
362        );
363
364        Ok(PeekStage::LinearizeTimestamp(PeekStageLinearizeTimestamp {
365            validity,
366            plan,
367            max_query_result_size,
368            source_ids,
369            target_replica,
370            timeline_context,
371            optimizer,
372            explain_ctx,
373        }))
374    }
375
376    /// Possibly linearize a timestamp from a `TimestampOracle`.
377    #[instrument]
378    async fn peek_linearize_timestamp(
379        &self,
380        session: &Session,
381        PeekStageLinearizeTimestamp {
382            validity,
383            source_ids,
384            plan,
385            max_query_result_size,
386            target_replica,
387            timeline_context,
388            optimizer,
389            explain_ctx,
390        }: PeekStageLinearizeTimestamp,
391    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
392        let isolation_level = session.vars().transaction_isolation().clone();
393        let timeline = Coordinator::get_timeline(&timeline_context);
394        let needs_linearized_read_ts =
395            Coordinator::needs_linearized_read_ts(&isolation_level, &plan.when);
396
397        let build_stage = move |oracle_read_ts: Option<Timestamp>| PeekStageRealTimeRecency {
398            validity,
399            plan,
400            max_query_result_size,
401            source_ids,
402            target_replica,
403            timeline_context,
404            oracle_read_ts,
405            optimizer,
406            explain_ctx,
407        };
408
409        match timeline {
410            Some(timeline) if needs_linearized_read_ts => {
411                let oracle = self.get_timestamp_oracle(&timeline);
412
413                // We ship the timestamp oracle off to an async task, so that we
414                // don't block the main task while we wait.
415
416                let span = Span::current();
417                Ok(StageResult::Handle(mz_ore::task::spawn(
418                    || "linearize timestamp",
419                    async move {
420                        let oracle_read_ts = oracle.read_ts().await;
421                        let stage = build_stage(Some(oracle_read_ts));
422                        let stage = PeekStage::RealTimeRecency(stage);
423                        Ok(Box::new(stage))
424                    }
425                    .instrument(span),
426                )))
427            }
428            Some(_) | None => {
429                let stage = build_stage(None);
430                let stage = PeekStage::RealTimeRecency(stage);
431                Ok(StageResult::Immediate(Box::new(stage)))
432            }
433        }
434    }
435
436    /// Determine a read timestamp and create appropriate read holds.
437    #[instrument]
438    fn peek_timestamp_read_hold(
439        &mut self,
440        session: &mut Session,
441        PeekStageTimestampReadHold {
442            mut validity,
443            plan,
444            max_query_result_size,
445            source_ids,
446            target_replica,
447            timeline_context,
448            oracle_read_ts,
449            real_time_recency_ts,
450            optimizer,
451            explain_ctx,
452        }: PeekStageTimestampReadHold,
453    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
454        let cluster_id = match optimizer.as_ref() {
455            Either::Left(optimizer) => optimizer.cluster_id(),
456            Either::Right(optimizer) => optimizer.cluster_id(),
457        };
458        let id_bundle = self
459            .dataflow_builder(cluster_id)
460            .sufficient_collections(source_ids.iter().copied());
461
462        // Although we have added `sources.depends_on()` to the validity already, also add the
463        // sufficient collections for safety.
464        let item_ids = id_bundle
465            .iter()
466            .map(|id| self.catalog().resolve_item_id(&id));
467        validity.extend_dependencies(item_ids);
468
469        let determination = self.sequence_peek_timestamp(
470            session,
471            &plan.when,
472            cluster_id,
473            timeline_context,
474            oracle_read_ts,
475            &id_bundle,
476            &source_ids,
477            real_time_recency_ts,
478            (&explain_ctx).into(),
479        )?;
480
481        let stage = PeekStage::Optimize(PeekStageOptimize {
482            validity,
483            plan,
484            max_query_result_size,
485            source_ids,
486            id_bundle,
487            target_replica,
488            determination,
489            optimizer,
490            explain_ctx,
491        });
492        Ok(StageResult::Immediate(Box::new(stage)))
493    }
494
495    #[instrument]
496    async fn peek_optimize(
497        &self,
498        session: &Session,
499        PeekStageOptimize {
500            validity,
501            plan,
502            max_query_result_size,
503            source_ids,
504            id_bundle,
505            target_replica,
506            determination,
507            mut optimizer,
508            explain_ctx,
509        }: PeekStageOptimize,
510    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
511        // Generate data structures that can be moved to another task where we will perform possibly
512        // expensive optimizations.
513        let timestamp_context = determination.timestamp_context.clone();
514        let stats = self
515            .statistics_oracle(session, &source_ids, &timestamp_context.antichain(), true)
516            .await
517            .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
518        let session = session.meta();
519        let now = self.catalog().config().now.clone();
520        let catalog = self.owned_catalog();
521        let mut compute_instances = BTreeMap::new();
522        if explain_ctx.needs_plan_insights() {
523            // There's a chance for index skew (indexes were created/deleted between stages) from the
524            // original plan, but that seems acceptable for insights.
525            for cluster in self.catalog().user_clusters() {
526                let snapshot = self.instance_snapshot(cluster.id).expect("must exist");
527                compute_instances.insert(cluster.name.clone(), snapshot);
528            }
529        }
530
531        let span = Span::current();
532        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
533            || "optimize peek",
534            move || {
535                span.in_scope(|| {
536                    let pipeline = || -> Result<Either<optimize::peek::GlobalLirPlan, optimize::copy_to::GlobalLirPlan>, AdapterError> {
537                        let _dispatch_guard = explain_ctx.dispatch_guard();
538
539                        let raw_expr = plan.source.clone();
540
541                        match optimizer.as_mut() {
542                            // Optimize SELECT statement.
543                            Either::Left(optimizer) => {
544                                // HIR ⇒ MIR lowering and MIR optimization (local)
545                                let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
546                                // Attach resolved context required to continue the pipeline.
547                                let local_mir_plan = local_mir_plan.resolve(timestamp_context.clone(), &session, stats);
548                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
549                                let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
550
551                                Ok(Either::Left(global_lir_plan))
552                            }
553                            // Optimize COPY TO statement.
554                            Either::Right(optimizer) => {
555                                // HIR ⇒ MIR lowering and MIR optimization (local)
556                                let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
557                                // Attach resolved context required to continue the pipeline.
558                                let local_mir_plan = local_mir_plan.resolve(timestamp_context.clone(), &session, stats);
559                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
560                                let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
561
562                                Ok(Either::Right(global_lir_plan))
563                            }
564                        }
565                    };
566
567                    let pipeline_result = pipeline();
568                    let optimization_finished_at = now();
569
570                    let stage = match pipeline_result {
571                        Ok(Either::Left(global_lir_plan)) => {
572                            let optimizer = optimizer.unwrap_left();
573                            // Enable fast path cluster calculation for slow path plans.
574                            let needs_plan_insights = explain_ctx.needs_plan_insights();
575                            // Disable anything that uses the optimizer if we only want the notice and
576                            // plan optimization took longer than the threshold. This is to prevent a
577                            // situation where optimizing takes a while and there a lots of clusters,
578                            // which would delay peek execution by the product of those.
579                            let opt_limit = PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
580                                .get(catalog.system_config().dyncfgs());
581                            let target_instance = catalog
582                                .get_cluster(optimizer.cluster_id())
583                                .name
584                                .clone();
585                            let enable_re_optimize =
586                                !(matches!(explain_ctx, ExplainContext::PlanInsightsNotice(_))
587                                    && optimizer.duration() > opt_limit);
588                            let insights_ctx = needs_plan_insights.then(|| PlanInsightsContext {
589                                stmt: plan.select.as_deref().map(Clone::clone).map(Statement::Select),
590                                raw_expr: plan.source.clone(),
591                                catalog,
592                                compute_instances,
593                                target_instance,
594                                metrics: optimizer.metrics().clone(),
595                                finishing: optimizer.finishing().clone(),
596                                optimizer_config: optimizer.config().clone(),
597                                session,
598                                timestamp_context,
599                                view_id: optimizer.select_id(),
600                                index_id: optimizer.index_id(),
601                                enable_re_optimize,
602                            }).map(Box::new);
603                            match explain_ctx {
604                                ExplainContext::Plan(explain_ctx) => {
605                                    let (_, df_meta, _) = global_lir_plan.unapply();
606                                    PeekStage::ExplainPlan(PeekStageExplainPlan {
607                                        validity,
608                                        optimizer,
609                                        df_meta,
610                                        explain_ctx,
611                                        insights_ctx,
612                                    })
613                                }
614                                ExplainContext::PlanInsightsNotice(optimizer_trace) => {
615                                    PeekStage::Finish(PeekStageFinish {
616                                        validity,
617                                        plan,
618                                        max_query_result_size,
619                                        id_bundle,
620                                        target_replica,
621                                        source_ids,
622                                        determination,
623                                        cluster_id: optimizer.cluster_id(),
624                                        finishing: optimizer.finishing().clone(),
625                                        plan_insights_optimizer_trace: Some(optimizer_trace),
626                                        global_lir_plan,
627                                        optimization_finished_at,
628                                        insights_ctx,
629                                    })
630                                }
631                                ExplainContext::None => PeekStage::Finish(PeekStageFinish {
632                                    validity,
633                                    plan,
634                                    max_query_result_size,
635                                    id_bundle,
636                                    target_replica,
637                                    source_ids,
638                                    determination,
639                                    cluster_id: optimizer.cluster_id(),
640                                    finishing: optimizer.finishing().clone(),
641                                    plan_insights_optimizer_trace: None,
642                                    global_lir_plan,
643                                    optimization_finished_at,
644                                    insights_ctx,
645                                }),
646                                ExplainContext::Pushdown => {
647                                    let (plan, _, _) = global_lir_plan.unapply();
648                                    let imports = match plan {
649                                        PeekPlan::SlowPath(plan) => plan
650                                            .desc
651                                            .source_imports
652                                            .into_iter()
653                                            .filter_map(|(id, import)| {
654                                                import.desc
655                                                    .arguments
656                                                    .operators
657                                                    .map(|mfp| (id, mfp))
658                                            })
659                                            .collect(),
660                                        PeekPlan::FastPath(_) => BTreeMap::default(),
661                                    };
662                                    PeekStage::ExplainPushdown(PeekStageExplainPushdown {
663                                        validity,
664                                        determination,
665                                        imports,
666                                    })
667                                }
668                            }
669                        }
670                        Ok(Either::Right(global_lir_plan)) => {
671                            let optimizer = optimizer.unwrap_right();
672                            PeekStage::CopyToPreflight(PeekStageCopyTo {
673                                validity,
674                                optimizer,
675                                global_lir_plan,
676                                optimization_finished_at,
677                                source_ids,
678                            })
679                        }
680                        // Internal optimizer errors are handled differently
681                        // depending on the caller.
682                        Err(err) => {
683                            let Some(optimizer) = optimizer.left() else {
684                                // In `COPY TO` contexts, immediately retire the
685                                // execution with the error.
686                                return Err(err);
687                            };
688                            let ExplainContext::Plan(explain_ctx) = explain_ctx else {
689                                // In `sequence_~` contexts, immediately retire the
690                                // execution with the error.
691                                return Err(err);
692                            };
693
694                            if explain_ctx.broken {
695                                // In `EXPLAIN BROKEN` contexts, just log the error
696                                // and move to the next stage with default
697                                // parameters.
698                                tracing::error!("error while handling EXPLAIN statement: {}", err);
699                                PeekStage::ExplainPlan(PeekStageExplainPlan {
700                                    validity,
701                                    optimizer,
702                                    df_meta: Default::default(),
703                                    explain_ctx,
704                                    insights_ctx: None,
705                                })
706                            } else {
707                                // In regular `EXPLAIN` contexts, immediately retire
708                                // the execution with the error.
709                                return Err(err);
710                            }
711                        }
712                    };
713                    Ok(Box::new(stage))
714                })
715            },
716        )))
717    }
718
719    #[instrument]
720    async fn peek_real_time_recency(
721        &self,
722        session: &Session,
723        PeekStageRealTimeRecency {
724            validity,
725            plan,
726            max_query_result_size,
727            source_ids,
728            target_replica,
729            timeline_context,
730            oracle_read_ts,
731            optimizer,
732            explain_ctx,
733        }: PeekStageRealTimeRecency,
734    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
735        let fut = self
736            .determine_real_time_recent_timestamp_if_needed(session, source_ids.iter().copied())
737            .await?;
738
739        match fut {
740            Some(fut) => {
741                let span = Span::current();
742                Ok(StageResult::Handle(mz_ore::task::spawn(
743                    || "peek real time recency",
744                    async move {
745                        let real_time_recency_ts = fut.await?;
746                        let stage = PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
747                            validity,
748                            plan,
749                            max_query_result_size,
750                            target_replica,
751                            timeline_context,
752                            source_ids,
753                            optimizer,
754                            explain_ctx,
755                            oracle_read_ts,
756                            real_time_recency_ts: Some(real_time_recency_ts),
757                        });
758                        Ok(Box::new(stage))
759                    }
760                    .instrument(span),
761                )))
762            }
763            None => Ok(StageResult::Immediate(Box::new(
764                PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
765                    validity,
766                    plan,
767                    max_query_result_size,
768                    target_replica,
769                    timeline_context,
770                    source_ids,
771                    optimizer,
772                    explain_ctx,
773                    oracle_read_ts,
774                    real_time_recency_ts: None,
775                }),
776            ))),
777        }
778    }
779
780    #[instrument]
781    async fn peek_finish(
782        &mut self,
783        ctx: &mut ExecuteContext,
784        PeekStageFinish {
785            validity: _,
786            plan,
787            max_query_result_size,
788            id_bundle,
789            target_replica,
790            source_ids,
791            determination,
792            cluster_id,
793            finishing,
794            plan_insights_optimizer_trace,
795            global_lir_plan,
796            optimization_finished_at,
797            insights_ctx,
798        }: PeekStageFinish,
799    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
800        if let Some(id) = ctx.extra.contents() {
801            self.record_statement_lifecycle_event(
802                &id,
803                &StatementLifecycleEvent::OptimizationFinished,
804                optimization_finished_at,
805            );
806        }
807
808        let session = ctx.session_mut();
809        let conn_id = session.conn_id().clone();
810
811        let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
812        let source_arity = typ.arity();
813
814        emit_optimizer_notices(&*self.catalog, &*session, &df_meta.optimizer_notices);
815
816        if let Some(trace) = plan_insights_optimizer_trace {
817            let target_cluster = self.catalog().get_cluster(cluster_id);
818            let features = OptimizerFeatures::from(self.catalog().system_config())
819                .override_from(&target_cluster.config.features());
820            let insights = trace
821                .into_plan_insights(
822                    &features,
823                    &self.catalog().for_session(session),
824                    Some(plan.finishing),
825                    Some(target_cluster),
826                    df_meta,
827                    insights_ctx,
828                )
829                .await?;
830            session.add_notice(AdapterNotice::PlanInsights(insights));
831        }
832
833        let planned_peek = PlannedPeek {
834            plan: peek_plan,
835            determination: determination.clone(),
836            conn_id: conn_id.clone(),
837            intermediate_result_type: typ,
838            source_arity,
839            source_ids,
840        };
841
842        if let Some(transient_index_id) = match &planned_peek.plan {
843            peek::PeekPlan::FastPath(_) => None,
844            peek::PeekPlan::SlowPath(PeekDataflowPlan { id, .. }) => Some(id),
845        } {
846            if let Some(statement_logging_id) = ctx.extra.contents() {
847                self.set_transient_index_id(statement_logging_id, *transient_index_id);
848            }
849        }
850
851        if let Some(logging_id) = ctx.extra().contents() {
852            let watch_set = WatchSetCreation::new(
853                logging_id,
854                self.catalog.state(),
855                &id_bundle,
856                determination.timestamp_context.timestamp_or_default(),
857            );
858            self.install_peek_watch_sets(conn_id.clone(), watch_set).expect("the old peek sequencing re-verifies the dependencies' existence before installing the new watch sets");
859        }
860
861        let max_result_size = self.catalog().system_config().max_result_size();
862
863        // Implement the peek, and capture the response.
864        let resp = self
865            .implement_peek_plan(
866                ctx.extra_mut(),
867                planned_peek,
868                finishing,
869                cluster_id,
870                target_replica,
871                max_result_size,
872                max_query_result_size,
873            )
874            .await?;
875
876        if ctx.session().vars().emit_timestamp_notice() {
877            let explanation = self.explain_timestamp(
878                ctx.session().conn_id(),
879                ctx.session().pcx().wall_time,
880                cluster_id,
881                &id_bundle,
882                determination,
883            );
884            ctx.session()
885                .add_notice(AdapterNotice::QueryTimestamp { explanation });
886        }
887
888        let resp = match plan.copy_to {
889            None => resp,
890            Some(format) => ExecuteResponse::CopyTo {
891                format,
892                resp: Box::new(resp),
893            },
894        };
895        Ok(StageResult::Response(resp))
896    }
897
898    #[instrument]
899    async fn peek_copy_to_preflight(
900        &self,
901        copy_to: PeekStageCopyTo,
902    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
903        let connection_context = self.connection_context().clone();
904        Ok(StageResult::Handle(mz_ore::task::spawn(
905            || "peek copy to preflight",
906            async {
907                let sinks = &copy_to.global_lir_plan.df_desc().sink_exports;
908                if sinks.len() != 1 {
909                    return Err(AdapterError::Internal(
910                        "expected exactly one copy to s3 sink".into(),
911                    ));
912                }
913                let (sink_id, sink_desc) = sinks
914                    .first_key_value()
915                    .expect("known to be exactly one copy to s3 sink");
916                match &sink_desc.connection {
917                    ComputeSinkConnection::CopyToS3Oneshot(conn) => {
918                        mz_storage_types::sinks::s3_oneshot_sink::preflight(
919                            connection_context,
920                            &conn.aws_connection,
921                            &conn.upload_info,
922                            conn.connection_id,
923                            *sink_id,
924                        )
925                        .await?;
926                        Ok(Box::new(PeekStage::CopyToDataflow(copy_to)))
927                    }
928                    _ => Err(AdapterError::Internal(
929                        "expected copy to s3 oneshot sink".into(),
930                    )),
931                }
932            },
933        )))
934    }
935
936    #[instrument]
937    async fn peek_copy_to_dataflow(
938        &mut self,
939        ctx: &ExecuteContext,
940        PeekStageCopyTo {
941            validity: _,
942            optimizer,
943            global_lir_plan,
944            optimization_finished_at,
945            source_ids,
946        }: PeekStageCopyTo,
947    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
948        if let Some(id) = ctx.extra.contents() {
949            self.record_statement_lifecycle_event(
950                &id,
951                &StatementLifecycleEvent::OptimizationFinished,
952                optimization_finished_at,
953            );
954        }
955
956        let sink_id = global_lir_plan.sink_id();
957        let cluster_id = optimizer.cluster_id();
958
959        let (df_desc, df_meta) = global_lir_plan.unapply();
960
961        emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
962
963        // Callback for the active copy to.
964        let (tx, rx) = oneshot::channel();
965        let active_copy_to = ActiveCopyTo {
966            conn_id: ctx.session().conn_id().clone(),
967            tx,
968            cluster_id,
969            depends_on: source_ids,
970        };
971        // Add metadata for the new COPY TO. CopyTo returns a `ready` future, so it is safe to drop.
972        drop(
973            self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
974                .await,
975        );
976
977        // Ship dataflow.
978        self.ship_dataflow(df_desc, cluster_id, None).await;
979
980        let span = Span::current();
981        Ok(StageResult::HandleRetire(mz_ore::task::spawn(
982            || "peek copy to dataflow",
983            async {
984                let res = rx.await;
985                match res {
986                    Ok(res) => res,
987                    Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
988                }
989            }
990            .instrument(span),
991        )))
992    }
993
994    #[instrument]
995    async fn peek_explain_plan(
996        &self,
997        session: &Session,
998        PeekStageExplainPlan {
999            optimizer,
1000            insights_ctx,
1001            df_meta,
1002            explain_ctx,
1003            ..
1004        }: PeekStageExplainPlan,
1005    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1006        let rows = super::super::explain_plan_inner(
1007            session,
1008            self.catalog(),
1009            df_meta,
1010            explain_ctx,
1011            optimizer,
1012            insights_ctx,
1013        )
1014        .await?;
1015
1016        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
1017    }
1018
1019    #[instrument]
1020    async fn peek_explain_pushdown(
1021        &self,
1022        session: &Session,
1023        stage: PeekStageExplainPushdown,
1024    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1025        let as_of = stage.determination.timestamp_context.antichain();
1026        let mz_now = stage
1027            .determination
1028            .timestamp_context
1029            .timestamp()
1030            .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1031            .unwrap_or_else(ResultSpec::value_all);
1032        let fut = self
1033            .explain_pushdown_future(session, as_of, mz_now, stage.imports)
1034            .await;
1035        let span = Span::current();
1036        Ok(StageResult::HandleRetire(mz_ore::task::spawn(
1037            || "peek explain pushdown",
1038            fut.instrument(span),
1039        )))
1040    }
1041
1042    /// Determines the query timestamp and acquires read holds on dependent sources
1043    /// if necessary.
1044    #[instrument]
1045    pub(super) fn sequence_peek_timestamp(
1046        &mut self,
1047        session: &mut Session,
1048        when: &QueryWhen,
1049        cluster_id: ClusterId,
1050        timeline_context: TimelineContext,
1051        oracle_read_ts: Option<Timestamp>,
1052        source_bundle: &CollectionIdBundle,
1053        source_ids: &BTreeSet<GlobalId>,
1054        real_time_recency_ts: Option<Timestamp>,
1055        requires_linearization: RequireLinearization,
1056    ) -> Result<TimestampDetermination<Timestamp>, AdapterError> {
1057        let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when);
1058        let timedomain_bundle;
1059
1060        // Fetch or generate a timestamp for this query and what the read holds would be if we need to set
1061        // them.
1062        let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
1063            // Use the transaction's timestamp if it exists and this isn't an AS OF query.
1064            Some(
1065                determination @ TimestampDetermination {
1066                    timestamp_context: TimestampContext::TimelineTimestamp { .. },
1067                    ..
1068                },
1069            ) if in_immediate_multi_stmt_txn => (determination, None),
1070            _ => {
1071                let determine_bundle = if in_immediate_multi_stmt_txn {
1072                    // In a transaction, determine a timestamp that will be valid for anything in
1073                    // any schema referenced by the first query.
1074                    timedomain_bundle = timedomain_for(
1075                        self.catalog(),
1076                        &self.index_oracle(cluster_id),
1077                        source_ids,
1078                        &timeline_context,
1079                        session.conn_id(),
1080                        cluster_id,
1081                    )?;
1082
1083                    &timedomain_bundle
1084                } else {
1085                    // If not in a transaction, use the source.
1086                    source_bundle
1087                };
1088                let (determination, read_holds) = self.determine_timestamp(
1089                    session,
1090                    determine_bundle,
1091                    when,
1092                    cluster_id,
1093                    &timeline_context,
1094                    oracle_read_ts,
1095                    real_time_recency_ts,
1096                )?;
1097                // We only need read holds if the read depends on a timestamp.
1098                let read_holds = match determination.timestamp_context.timestamp() {
1099                    Some(_ts) => Some(read_holds),
1100                    None => {
1101                        // We don't need the read holds and shouldn't add them
1102                        // to the txn.
1103                        //
1104                        // TODO: Handle this within determine_timestamp.
1105                        drop(read_holds);
1106                        None
1107                    }
1108                };
1109                (determination, read_holds)
1110            }
1111        };
1112
1113        // Always either verify the current statement ids are within the existing
1114        // transaction's read hold set (timedomain), or create the read holds if this is the
1115        // first statement in a transaction (or this is a single statement transaction).
1116        // This must happen even if this is an `AS OF` query as well. There are steps after
1117        // this that happen off thread, so no matter the kind of statement or transaction,
1118        // we must acquire read holds here so they are held until the off-thread work
1119        // returns to the coordinator.
1120
1121        if let Some(txn_reads) = self.txn_read_holds.get(session.conn_id()) {
1122            // Find referenced ids not in the read hold. A reference could be caused by a
1123            // user specifying an object in a different schema than the first query. An
1124            // index could be caused by a CREATE INDEX after the transaction started.
1125            let allowed_id_bundle = txn_reads.id_bundle();
1126
1127            // We don't need the read holds that determine_timestamp acquired
1128            // for us.
1129            drop(read_holds);
1130
1131            let outside = source_bundle.difference(&allowed_id_bundle);
1132            // Queries without a timestamp and timeline can belong to any existing timedomain.
1133            if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
1134                let valid_names =
1135                    allowed_id_bundle.resolve_names(self.catalog(), session.conn_id());
1136                let invalid_names = outside.resolve_names(self.catalog(), session.conn_id());
1137                return Err(AdapterError::RelationOutsideTimeDomain {
1138                    relations: invalid_names,
1139                    names: valid_names,
1140                });
1141            }
1142        } else if let Some(read_holds) = read_holds {
1143            self.store_transaction_read_holds(session.conn_id().clone(), read_holds);
1144        }
1145
1146        // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
1147        // arbitrary and we don't recall why we did it (possibly an error!). Change this to always
1148        // set the transaction ops. Decide and document what our policy should be on AS OF queries.
1149        // Maybe they shouldn't be allowed in transactions at all because it's hard to explain
1150        // what's going on there. This should probably get a small design document.
1151
1152        // We only track the peeks in the session if the query doesn't use AS
1153        // OF or we're inside an explicit transaction. The latter case is
1154        // necessary to support PG's `BEGIN` semantics, whose behavior can
1155        // depend on whether or not reads have occurred in the txn.
1156        let mut transaction_determination = determination.clone();
1157        if when.is_transactional() {
1158            session.add_transaction_ops(TransactionOps::Peeks {
1159                determination: transaction_determination,
1160                cluster_id,
1161                requires_linearization,
1162            })?;
1163        } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
1164            // If the query uses AS OF, then ignore the timestamp.
1165            transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
1166            session.add_transaction_ops(TransactionOps::Peeks {
1167                determination: transaction_determination,
1168                cluster_id,
1169                requires_linearization,
1170            })?;
1171        };
1172
1173        Ok(determination)
1174    }
1175}