mz_adapter/coord/sequencer/inner/
peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::str::FromStr;
12use std::sync::Arc;
13
14use http::Uri;
15use itertools::Either;
16use maplit::btreemap;
17use mz_compute_types::sinks::ComputeSinkConnection;
18use mz_controller_types::ClusterId;
19use mz_expr::{CollectionPlan, ResultSpec};
20use mz_ore::cast::CastFrom;
21use mz_ore::instrument;
22use mz_repr::explain::{ExprHumanizerExt, TransientItem};
23use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
24use mz_repr::{Datum, GlobalId, RowArena, Timestamp};
25use mz_sql::ast::{ExplainStage, Statement};
26use mz_sql::catalog::CatalogCluster;
27// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
28use mz_catalog::memory::objects::CatalogItem;
29use mz_sql::plan::QueryWhen;
30use mz_sql::plan::{self, HirScalarExpr};
31use mz_sql::session::metadata::SessionMetadata;
32use mz_transform::EmptyStatisticsOracle;
33use tokio::sync::oneshot;
34use tracing::warn;
35use tracing::{Instrument, Span};
36
37use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
38use crate::command::ExecuteResponse;
39use crate::coord::id_bundle::CollectionIdBundle;
40use crate::coord::peek::{self, PeekDataflowPlan, PeekPlan, PlannedPeek};
41use crate::coord::sequencer::inner::{check_log_reads, return_if_err};
42use crate::coord::timeline::TimelineContext;
43use crate::coord::timestamp_selection::{
44    TimestampContext, TimestampDetermination, TimestampProvider,
45};
46use crate::coord::{
47    Coordinator, CopyToContext, ExecuteContext, ExplainContext, ExplainPlanContext, Message,
48    PeekStage, PeekStageCopyTo, PeekStageExplainPlan, PeekStageExplainPushdown, PeekStageFinish,
49    PeekStageLinearizeTimestamp, PeekStageOptimize, PeekStageRealTimeRecency,
50    PeekStageTimestampReadHold, PlanValidity, StageResult, Staged, TargetCluster, WatchSetResponse,
51};
52use crate::error::AdapterError;
53use crate::explain::insights::PlanInsightsContext;
54use crate::explain::optimizer_trace::OptimizerTrace;
55use crate::notice::AdapterNotice;
56use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
57use crate::optimize::{self, Optimize};
58use crate::session::{RequireLinearization, Session, TransactionOps, TransactionStatus};
59use crate::statement_logging::StatementLifecycleEvent;
60
61impl Staged for PeekStage {
62    type Ctx = ExecuteContext;
63
64    fn validity(&mut self) -> &mut PlanValidity {
65        match self {
66            PeekStage::LinearizeTimestamp(stage) => &mut stage.validity,
67            PeekStage::RealTimeRecency(stage) => &mut stage.validity,
68            PeekStage::TimestampReadHold(stage) => &mut stage.validity,
69            PeekStage::Optimize(stage) => &mut stage.validity,
70            PeekStage::Finish(stage) => &mut stage.validity,
71            PeekStage::ExplainPlan(stage) => &mut stage.validity,
72            PeekStage::ExplainPushdown(stage) => &mut stage.validity,
73            PeekStage::CopyToPreflight(stage) => &mut stage.validity,
74            PeekStage::CopyToDataflow(stage) => &mut stage.validity,
75        }
76    }
77
78    async fn stage(
79        self,
80        coord: &mut Coordinator,
81        ctx: &mut ExecuteContext,
82    ) -> Result<StageResult<Box<Self>>, AdapterError> {
83        match self {
84            PeekStage::LinearizeTimestamp(stage) => {
85                coord.peek_linearize_timestamp(ctx.session(), stage).await
86            }
87            PeekStage::RealTimeRecency(stage) => {
88                coord.peek_real_time_recency(ctx.session(), stage).await
89            }
90            PeekStage::TimestampReadHold(stage) => {
91                coord.peek_timestamp_read_hold(ctx.session_mut(), stage)
92            }
93            PeekStage::Optimize(stage) => coord.peek_optimize(ctx.session(), stage).await,
94            PeekStage::Finish(stage) => coord.peek_finish(ctx, stage).await,
95            PeekStage::ExplainPlan(stage) => coord.peek_explain_plan(ctx.session(), stage).await,
96            PeekStage::ExplainPushdown(stage) => {
97                coord.peek_explain_pushdown(ctx.session(), stage).await
98            }
99            PeekStage::CopyToPreflight(stage) => coord.peek_copy_to_preflight(stage).await,
100            PeekStage::CopyToDataflow(stage) => coord.peek_copy_to_dataflow(ctx, stage).await,
101        }
102    }
103
104    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
105        Message::PeekStageReady {
106            ctx,
107            span,
108            stage: self,
109        }
110    }
111
112    fn cancel_enabled(&self) -> bool {
113        true
114    }
115}
116
117impl Coordinator {
118    /// Sequence a peek, determining a timestamp and the most efficient dataflow interaction.
119    ///
120    /// Peeks are sequenced by assigning a timestamp for evaluation, and then determining and
121    /// deploying the most efficient evaluation plan. The peek could evaluate to a constant,
122    /// be a simple read out of an existing arrangement, or required a new dataflow to build
123    /// the results to return.
124    #[instrument]
125    pub(crate) async fn sequence_peek(
126        &mut self,
127        ctx: ExecuteContext,
128        plan: plan::SelectPlan,
129        target_cluster: TargetCluster,
130        max_query_result_size: Option<u64>,
131    ) {
132        let explain_ctx = if ctx.session().vars().emit_plan_insights_notice() {
133            let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
134            ExplainContext::PlanInsightsNotice(optimizer_trace)
135        } else {
136            ExplainContext::None
137        };
138
139        let stage = return_if_err!(
140            self.peek_validate(
141                ctx.session(),
142                plan,
143                target_cluster,
144                None,
145                explain_ctx,
146                max_query_result_size
147            ),
148            ctx
149        );
150        self.sequence_staged(ctx, Span::current(), stage).await;
151    }
152
153    #[instrument]
154    pub(crate) async fn sequence_copy_to(
155        &mut self,
156        ctx: ExecuteContext,
157        plan::CopyToPlan {
158            select_plan,
159            desc,
160            to,
161            connection,
162            connection_id,
163            format,
164            max_file_size,
165        }: plan::CopyToPlan,
166        target_cluster: TargetCluster,
167    ) {
168        let eval_uri = |to: HirScalarExpr| -> Result<Uri, AdapterError> {
169            let style = ExprPrepStyle::OneShot {
170                logical_time: EvalTime::NotAvailable,
171                session: ctx.session(),
172                catalog_state: self.catalog().state(),
173            };
174            let mut to = to.lower_uncorrelated()?;
175            prep_scalar_expr(&mut to, style)?;
176            let temp_storage = RowArena::new();
177            let evaled = to.eval(&[], &temp_storage)?;
178            if evaled == Datum::Null {
179                coord_bail!("COPY TO target value can not be null");
180            }
181            let to_url = match Uri::from_str(evaled.unwrap_str()) {
182                Ok(url) => {
183                    if url.scheme_str() != Some("s3") {
184                        coord_bail!("only 's3://...' urls are supported as COPY TO target");
185                    }
186                    url
187                }
188                Err(e) => coord_bail!("could not parse COPY TO target url: {}", e),
189            };
190            Ok(to_url)
191        };
192
193        let uri = return_if_err!(eval_uri(to), ctx);
194
195        let stage = return_if_err!(
196            self.peek_validate(
197                ctx.session(),
198                select_plan,
199                target_cluster,
200                Some(CopyToContext {
201                    desc,
202                    uri,
203                    connection,
204                    connection_id,
205                    format,
206                    max_file_size,
207                    // This will be set in `peek_stage_validate` stage below.
208                    output_batch_count: None,
209                }),
210                ExplainContext::None,
211                Some(ctx.session().vars().max_query_result_size()),
212            ),
213            ctx
214        );
215        self.sequence_staged(ctx, Span::current(), stage).await;
216    }
217
218    #[instrument]
219    pub(crate) async fn explain_peek(
220        &mut self,
221        ctx: ExecuteContext,
222        plan::ExplainPlanPlan {
223            stage,
224            format,
225            config,
226            explainee,
227        }: plan::ExplainPlanPlan,
228        target_cluster: TargetCluster,
229    ) {
230        let plan::Explainee::Statement(stmt) = explainee else {
231            // This is currently asserted in the `sequence_explain_plan` code that
232            // calls this method.
233            unreachable!()
234        };
235        let plan::ExplaineeStatement::Select { broken, plan, desc } = stmt else {
236            // This is currently asserted in the `sequence_explain_plan` code that
237            // calls this method.
238            unreachable!()
239        };
240
241        // Create an OptimizerTrace instance to collect plans emitted when
242        // executing the optimizer pipeline.
243        let optimizer_trace = OptimizerTrace::new(stage.paths());
244
245        let stage = return_if_err!(
246            self.peek_validate(
247                ctx.session(),
248                plan,
249                target_cluster,
250                None,
251                ExplainContext::Plan(ExplainPlanContext {
252                    broken,
253                    config,
254                    format,
255                    stage,
256                    replan: None,
257                    desc: Some(desc),
258                    optimizer_trace,
259                }),
260                Some(ctx.session().vars().max_query_result_size()),
261            ),
262            ctx
263        );
264        self.sequence_staged(ctx, Span::current(), stage).await;
265    }
266
267    /// Do some simple validation. We must defer most of it until after any off-thread work.
268    #[instrument]
269    pub fn peek_validate(
270        &self,
271        session: &Session,
272        plan: mz_sql::plan::SelectPlan,
273        target_cluster: TargetCluster,
274        copy_to_ctx: Option<CopyToContext>,
275        explain_ctx: ExplainContext,
276        max_query_result_size: Option<u64>,
277    ) -> Result<PeekStage, AdapterError> {
278        // Collect optimizer parameters.
279        let catalog = self.owned_catalog();
280        let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
281        let compute_instance = self
282            .instance_snapshot(cluster.id())
283            .expect("compute instance does not exist");
284        let (_, view_id) = self.allocate_transient_id();
285        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
286            .override_from(&self.catalog.get_cluster(cluster.id()).config.features())
287            .override_from(&explain_ctx);
288
289        if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
290            return Err(AdapterError::NoClusterReplicasAvailable {
291                name: cluster.name.clone(),
292                is_managed: cluster.is_managed(),
293            });
294        }
295
296        let optimizer = match copy_to_ctx {
297            None => {
298                // Collect optimizer parameters specific to the peek::Optimizer.
299                let compute_instance = self
300                    .instance_snapshot(cluster.id())
301                    .expect("compute instance does not exist");
302                let (_, view_id) = self.allocate_transient_id();
303                let (_, index_id) = self.allocate_transient_id();
304
305                // Build an optimizer for this SELECT.
306                Either::Left(optimize::peek::Optimizer::new(
307                    Arc::clone(&catalog),
308                    compute_instance,
309                    plan.finishing.clone(),
310                    view_id,
311                    index_id,
312                    optimizer_config,
313                    self.optimizer_metrics(),
314                ))
315            }
316            Some(mut copy_to_ctx) => {
317                // Getting the max worker count across replicas
318                // and using that value for the number of batches to
319                // divide the copy output into.
320                let max_worker_count = match cluster
321                    .replicas()
322                    .map(|r| r.config.location.workers())
323                    .max()
324                {
325                    Some(count) => u64::cast_from(count),
326                    None => {
327                        return Err(AdapterError::NoClusterReplicasAvailable {
328                            name: cluster.name.clone(),
329                            is_managed: cluster.is_managed(),
330                        });
331                    }
332                };
333                copy_to_ctx.output_batch_count = Some(max_worker_count);
334                // Build an optimizer for this COPY TO.
335                Either::Right(optimize::copy_to::Optimizer::new(
336                    Arc::clone(&catalog),
337                    compute_instance,
338                    view_id,
339                    copy_to_ctx,
340                    optimizer_config,
341                    self.optimizer_metrics(),
342                ))
343            }
344        };
345
346        let target_replica_name = session.vars().cluster_replica();
347        let mut target_replica = target_replica_name
348            .map(|name| {
349                cluster
350                    .replica_id(name)
351                    .ok_or(AdapterError::UnknownClusterReplica {
352                        cluster_name: cluster.name.clone(),
353                        replica_name: name.to_string(),
354                    })
355            })
356            .transpose()?;
357
358        let source_ids = plan.source.depends_on();
359        let mut timeline_context = self.validate_timeline_context(source_ids.iter().copied())?;
360        if matches!(timeline_context, TimelineContext::TimestampIndependent)
361            && plan.source.contains_temporal()?
362        {
363            // If the source IDs are timestamp independent but the query contains temporal functions,
364            // then the timeline context needs to be upgraded to timestamp dependent. This is
365            // required because `source_ids` doesn't contain functions.
366            timeline_context = TimelineContext::TimestampDependent;
367        }
368
369        let notices = check_log_reads(
370            &catalog,
371            cluster,
372            &source_ids,
373            &mut target_replica,
374            session.vars(),
375        )?;
376        session.add_notices(notices);
377
378        let dependencies = source_ids
379            .iter()
380            .map(|id| self.catalog.resolve_item_id(id))
381            .collect();
382        let validity = PlanValidity::new(
383            catalog.transient_revision(),
384            dependencies,
385            Some(cluster.id()),
386            target_replica,
387            session.role_metadata().clone(),
388        );
389
390        Ok(PeekStage::LinearizeTimestamp(PeekStageLinearizeTimestamp {
391            validity,
392            plan,
393            max_query_result_size,
394            source_ids,
395            target_replica,
396            timeline_context,
397            optimizer,
398            explain_ctx,
399        }))
400    }
401
402    /// Possibly linearize a timestamp from a `TimestampOracle`.
403    #[instrument]
404    async fn peek_linearize_timestamp(
405        &self,
406        session: &Session,
407        PeekStageLinearizeTimestamp {
408            validity,
409            source_ids,
410            plan,
411            max_query_result_size,
412            target_replica,
413            timeline_context,
414            optimizer,
415            explain_ctx,
416        }: PeekStageLinearizeTimestamp,
417    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
418        let isolation_level = session.vars().transaction_isolation().clone();
419        let timeline = Coordinator::get_timeline(&timeline_context);
420        let needs_linearized_read_ts =
421            Coordinator::needs_linearized_read_ts(&isolation_level, &plan.when);
422
423        let build_stage = move |oracle_read_ts: Option<Timestamp>| PeekStageRealTimeRecency {
424            validity,
425            plan,
426            max_query_result_size,
427            source_ids,
428            target_replica,
429            timeline_context,
430            oracle_read_ts,
431            optimizer,
432            explain_ctx,
433        };
434
435        match timeline {
436            Some(timeline) if needs_linearized_read_ts => {
437                let oracle = self.get_timestamp_oracle(&timeline);
438
439                // We ship the timestamp oracle off to an async task, so that we
440                // don't block the main task while we wait.
441
442                let span = Span::current();
443                Ok(StageResult::Handle(mz_ore::task::spawn(
444                    || "linearize timestamp",
445                    async move {
446                        let oracle_read_ts = oracle.read_ts().await;
447                        let stage = build_stage(Some(oracle_read_ts));
448                        let stage = PeekStage::RealTimeRecency(stage);
449                        Ok(Box::new(stage))
450                    }
451                    .instrument(span),
452                )))
453            }
454            Some(_) | None => {
455                let stage = build_stage(None);
456                let stage = PeekStage::RealTimeRecency(stage);
457                Ok(StageResult::Immediate(Box::new(stage)))
458            }
459        }
460    }
461
462    /// Determine a read timestamp and create appropriate read holds.
463    #[instrument]
464    fn peek_timestamp_read_hold(
465        &mut self,
466        session: &mut Session,
467        PeekStageTimestampReadHold {
468            mut validity,
469            plan,
470            max_query_result_size,
471            source_ids,
472            target_replica,
473            timeline_context,
474            oracle_read_ts,
475            real_time_recency_ts,
476            optimizer,
477            explain_ctx,
478        }: PeekStageTimestampReadHold,
479    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
480        let cluster_id = match optimizer.as_ref() {
481            Either::Left(optimizer) => optimizer.cluster_id(),
482            Either::Right(optimizer) => optimizer.cluster_id(),
483        };
484        let id_bundle = self
485            .dataflow_builder(cluster_id)
486            .sufficient_collections(source_ids.iter().copied());
487
488        // Although we have added `sources.depends_on()` to the validity already, also add the
489        // sufficient collections for safety.
490        let item_ids = id_bundle
491            .iter()
492            .map(|id| self.catalog().resolve_item_id(&id));
493        validity.extend_dependencies(item_ids);
494
495        let determination = self.sequence_peek_timestamp(
496            session,
497            &plan.when,
498            cluster_id,
499            timeline_context,
500            oracle_read_ts,
501            &id_bundle,
502            &source_ids,
503            real_time_recency_ts,
504            (&explain_ctx).into(),
505        )?;
506
507        let stage = PeekStage::Optimize(PeekStageOptimize {
508            validity,
509            plan,
510            max_query_result_size,
511            source_ids,
512            id_bundle,
513            target_replica,
514            determination,
515            optimizer,
516            explain_ctx,
517        });
518        Ok(StageResult::Immediate(Box::new(stage)))
519    }
520
521    #[instrument]
522    async fn peek_optimize(
523        &self,
524        session: &Session,
525        PeekStageOptimize {
526            validity,
527            plan,
528            max_query_result_size,
529            source_ids,
530            id_bundle,
531            target_replica,
532            determination,
533            mut optimizer,
534            explain_ctx,
535        }: PeekStageOptimize,
536    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
537        // Generate data structures that can be moved to another task where we will perform possibly
538        // expensive optimizations.
539        let timestamp_context = determination.timestamp_context.clone();
540        let stats = self
541            .statistics_oracle(session, &source_ids, &timestamp_context.antichain(), true)
542            .await
543            .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
544        let session = session.meta();
545        let now = self.catalog().config().now.clone();
546        let catalog = self.owned_catalog();
547        let mut compute_instances = BTreeMap::new();
548        if explain_ctx.needs_plan_insights() {
549            // There's a chance for index skew (indexes were created/deleted between stages) from the
550            // original plan, but that seems acceptable for insights.
551            for cluster in self.catalog().user_clusters() {
552                let snapshot = self.instance_snapshot(cluster.id).expect("must exist");
553                compute_instances.insert(cluster.name.clone(), snapshot);
554            }
555        }
556
557        let span = Span::current();
558        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
559            || "optimize peek",
560            move || {
561                span.in_scope(|| {
562                    let pipeline = || -> Result<Either<optimize::peek::GlobalLirPlan, optimize::copy_to::GlobalLirPlan>, AdapterError> {
563                        let _dispatch_guard = explain_ctx.dispatch_guard();
564
565                        let raw_expr = plan.source.clone();
566
567                        match optimizer.as_mut() {
568                            // Optimize SELECT statement.
569                            Either::Left(optimizer) => {
570                                // HIR ⇒ MIR lowering and MIR optimization (local)
571                                let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
572                                // Attach resolved context required to continue the pipeline.
573                                let local_mir_plan = local_mir_plan.resolve(timestamp_context.clone(), &session, stats);
574                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
575                                let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
576
577                                Ok(Either::Left(global_lir_plan))
578                            }
579                            // Optimize COPY TO statement.
580                            Either::Right(optimizer) => {
581                                // HIR ⇒ MIR lowering and MIR optimization (local)
582                                let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
583                                // Attach resolved context required to continue the pipeline.
584                                let local_mir_plan = local_mir_plan.resolve(timestamp_context.clone(), &session, stats);
585                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
586                                let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
587
588                                Ok(Either::Right(global_lir_plan))
589                            }
590                        }
591                    };
592
593                    let pipeline_result = pipeline();
594                    let optimization_finished_at = now();
595
596                    let stage = match pipeline_result {
597                        Ok(Either::Left(global_lir_plan)) => {
598                            let optimizer = optimizer.unwrap_left();
599                        // Enable fast path cluster calculation for slow path plans.
600                        let needs_plan_insights = explain_ctx.needs_plan_insights();
601                        // Disable anything that uses the optimizer if we only want the notice and
602                        // plan optimization took longer than the threshold. This is to prevent a
603                        // situation where optimizing takes a while and there a lots of clusters,
604                        // which would delay peek execution by the product of those.
605                        let opt_limit = mz_adapter_types::dyncfgs::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION.get(catalog.system_config().dyncfgs());
606                        let target_instance = catalog
607                            .get_cluster(optimizer.cluster_id())
608                            .name
609                            .clone();
610                        let enable_re_optimize =
611                            !(matches!(explain_ctx, ExplainContext::PlanInsightsNotice(_))
612                                && optimizer.duration() > opt_limit);
613                        let insights_ctx = needs_plan_insights.then(|| PlanInsightsContext {
614                            stmt: plan.select.as_deref().map(Clone::clone).map(Statement::Select),
615                            raw_expr: plan.source.clone(),
616                            catalog,
617                            compute_instances,
618                            target_instance,
619                            metrics: optimizer.metrics().clone(),
620                            finishing: optimizer.finishing().clone(),
621                            optimizer_config: optimizer.config().clone(),
622                            session,
623                            timestamp_context,
624                            view_id: optimizer.select_id(),
625                            index_id: optimizer.index_id(),
626                            enable_re_optimize,
627                        }).map(Box::new);
628                            match explain_ctx {
629                                ExplainContext::Plan(explain_ctx) => {
630                                    let (_, df_meta, _) = global_lir_plan.unapply();
631                                    PeekStage::ExplainPlan(PeekStageExplainPlan {
632                                        validity,
633                                        optimizer,
634                                        df_meta,
635                                        explain_ctx,
636                                        insights_ctx,
637                                    })
638                                }
639                                ExplainContext::PlanInsightsNotice(optimizer_trace) => {
640                                    PeekStage::Finish(PeekStageFinish {
641                                        validity,
642                                        plan,
643                                        max_query_result_size,
644                                        id_bundle,
645                                        target_replica,
646                                        source_ids,
647                                        determination,
648                                        cluster_id: optimizer.cluster_id(),
649                                        finishing: optimizer.finishing().clone(),
650                                        plan_insights_optimizer_trace: Some(optimizer_trace),
651                                        global_lir_plan,
652                                        optimization_finished_at,
653                                        insights_ctx,
654                                    })
655                                }
656                                ExplainContext::None => PeekStage::Finish(PeekStageFinish {
657                                    validity,
658                                    plan,
659                                    max_query_result_size,
660                                    id_bundle,
661                                    target_replica,
662                                    source_ids,
663                                    determination,
664                                    cluster_id: optimizer.cluster_id(),
665                                    finishing: optimizer.finishing().clone(),
666                                    plan_insights_optimizer_trace: None,
667                                    global_lir_plan,
668                                    optimization_finished_at,
669                                    insights_ctx,
670                                }),
671                                ExplainContext::Pushdown => {
672                                    let (plan, _, _) = global_lir_plan.unapply();
673                                    let imports = match plan {
674                                        PeekPlan::SlowPath(plan) => plan
675                                            .desc
676                                            .source_imports
677                                            .into_iter()
678                                            .filter_map(|(id, (desc, _, _upper))| {
679                                                desc.arguments.operators.map(|mfp| (id, mfp))
680                                            })
681                                            .collect(),
682                                        PeekPlan::FastPath(_) => BTreeMap::default(),
683                                    };
684                                    PeekStage::ExplainPushdown(PeekStageExplainPushdown {
685                                        validity,
686                                        determination,
687                                        imports,
688                                    })
689                                }
690                            }
691                        }
692                        Ok(Either::Right(global_lir_plan)) => {
693                            let optimizer = optimizer.unwrap_right();
694                            PeekStage::CopyToPreflight(PeekStageCopyTo {
695                                validity,
696                                optimizer,
697                                global_lir_plan,
698                                optimization_finished_at,
699                                source_ids,
700                            })
701                        }
702                        // Internal optimizer errors are handled differently
703                        // depending on the caller.
704                        Err(err) => {
705                            let Some(optimizer) = optimizer.left() else {
706                                // In `COPY TO` contexts, immediately retire the
707                                // execution with the error.
708                                return Err(err);
709                            };
710                            let ExplainContext::Plan(explain_ctx) = explain_ctx else {
711                                // In `sequence_~` contexts, immediately retire the
712                                // execution with the error.
713                                return Err(err);
714                            };
715
716                            if explain_ctx.broken {
717                                // In `EXPLAIN BROKEN` contexts, just log the error
718                                // and move to the next stage with default
719                                // parameters.
720                                tracing::error!("error while handling EXPLAIN statement: {}", err);
721                                PeekStage::ExplainPlan(PeekStageExplainPlan {
722                                    validity,
723                                    optimizer,
724                                    df_meta: Default::default(),
725                                    explain_ctx,
726                                    insights_ctx: None,
727                            })
728                            } else {
729                                // In regular `EXPLAIN` contexts, immediately retire
730                                // the execution with the error.
731                                return Err(err);
732                            }
733                        }
734                    };
735                    Ok(Box::new(stage))
736                })
737            },
738        )))
739    }
740
741    #[instrument]
742    async fn peek_real_time_recency(
743        &self,
744        session: &Session,
745        PeekStageRealTimeRecency {
746            validity,
747            plan,
748            max_query_result_size,
749            source_ids,
750            target_replica,
751            timeline_context,
752            oracle_read_ts,
753            optimizer,
754            explain_ctx,
755        }: PeekStageRealTimeRecency,
756    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
757        let item_ids: Vec<_> = source_ids
758            .iter()
759            .map(|gid| self.catalog.resolve_item_id(gid))
760            .collect();
761        let fut = self
762            .determine_real_time_recent_timestamp(session, item_ids.into_iter())
763            .await?;
764
765        match fut {
766            Some(fut) => {
767                let span = Span::current();
768                Ok(StageResult::Handle(mz_ore::task::spawn(
769                    || "peek real time recency",
770                    async move {
771                        let real_time_recency_ts = fut.await?;
772                        let stage = PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
773                            validity,
774                            plan,
775                            max_query_result_size,
776                            target_replica,
777                            timeline_context,
778                            source_ids,
779                            optimizer,
780                            explain_ctx,
781                            oracle_read_ts,
782                            real_time_recency_ts: Some(real_time_recency_ts),
783                        });
784                        Ok(Box::new(stage))
785                    }
786                    .instrument(span),
787                )))
788            }
789            None => Ok(StageResult::Immediate(Box::new(
790                PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
791                    validity,
792                    plan,
793                    max_query_result_size,
794                    target_replica,
795                    timeline_context,
796                    source_ids,
797                    optimizer,
798                    explain_ctx,
799                    oracle_read_ts,
800                    real_time_recency_ts: None,
801                }),
802            ))),
803        }
804    }
805
806    #[instrument]
807    async fn peek_finish(
808        &mut self,
809        ctx: &mut ExecuteContext,
810        PeekStageFinish {
811            validity: _,
812            plan,
813            max_query_result_size,
814            id_bundle,
815            target_replica,
816            source_ids,
817            determination,
818            cluster_id,
819            finishing,
820            plan_insights_optimizer_trace,
821            global_lir_plan,
822            optimization_finished_at,
823            insights_ctx,
824        }: PeekStageFinish,
825    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
826        if let Some(id) = ctx.extra.contents() {
827            self.record_statement_lifecycle_event(
828                &id,
829                &StatementLifecycleEvent::OptimizationFinished,
830                optimization_finished_at,
831            );
832        }
833
834        let session = ctx.session_mut();
835        let conn_id = session.conn_id().clone();
836
837        let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
838        let source_arity = typ.arity();
839
840        self.emit_optimizer_notices(&*session, &df_meta.optimizer_notices);
841
842        let target_cluster = self.catalog().get_cluster(cluster_id);
843
844        let features = OptimizerFeatures::from(self.catalog().system_config())
845            .override_from(&target_cluster.config.features());
846
847        if let Some(trace) = plan_insights_optimizer_trace {
848            let insights = trace
849                .into_plan_insights(
850                    &features,
851                    &self.catalog().for_session(session),
852                    Some(plan.finishing),
853                    Some(target_cluster),
854                    df_meta,
855                    insights_ctx,
856                )
857                .await?;
858            session.add_notice(AdapterNotice::PlanInsights(insights));
859        }
860
861        let planned_peek = PlannedPeek {
862            plan: peek_plan,
863            determination: determination.clone(),
864            conn_id: conn_id.clone(),
865            source_arity,
866            source_ids,
867        };
868
869        if let Some(transient_index_id) = match &planned_peek.plan {
870            peek::PeekPlan::FastPath(_) => None,
871            peek::PeekPlan::SlowPath(PeekDataflowPlan { id, .. }) => Some(id),
872        } {
873            if let Some(statement_logging_id) = ctx.extra.contents() {
874                self.set_transient_index_id(statement_logging_id, *transient_index_id);
875            }
876        }
877
878        if let Some(uuid) = ctx.extra().contents() {
879            let ts = determination.timestamp_context.timestamp_or_default();
880            let mut transitive_storage_deps = BTreeSet::new();
881            let mut transitive_compute_deps = BTreeSet::new();
882            for item_id in id_bundle
883                .iter()
884                .map(|gid| self.catalog.state().get_entry_by_global_id(&gid).id())
885                .flat_map(|id| self.catalog.state().transitive_uses(id))
886            {
887                let entry = self.catalog.state().get_entry(&item_id);
888                match entry.item() {
889                    // TODO(alter_table): Adding all of the GlobalIds for an object is incorrect.
890                    // For example, this peek may depend on just a single version of a table, but
891                    // we would add dependencies on all versions of said table. Doing this is okay
892                    // for now since we can't yet version tables, but should get fixed.
893                    CatalogItem::Table(_) | CatalogItem::Source(_) => {
894                        transitive_storage_deps.extend(entry.global_ids());
895                    }
896                    CatalogItem::MaterializedView(_) | CatalogItem::Index(_) => {
897                        transitive_compute_deps.extend(entry.global_ids());
898                    }
899                    _ => {}
900                }
901            }
902            self.install_storage_watch_set(
903                conn_id.clone(),
904                transitive_storage_deps,
905                ts,
906                WatchSetResponse::StatementDependenciesReady(
907                    uuid,
908                    StatementLifecycleEvent::StorageDependenciesFinished,
909                ),
910            );
911            self.install_compute_watch_set(
912                conn_id,
913                transitive_compute_deps,
914                ts,
915                WatchSetResponse::StatementDependenciesReady(
916                    uuid,
917                    StatementLifecycleEvent::ComputeDependenciesFinished,
918                ),
919            )
920        }
921
922        let max_result_size = self.catalog().system_config().max_result_size();
923
924        // Implement the peek, and capture the response.
925        let resp = self
926            .implement_peek_plan(
927                ctx.extra_mut(),
928                planned_peek,
929                finishing,
930                cluster_id,
931                target_replica,
932                max_result_size,
933                max_query_result_size,
934            )
935            .await?;
936
937        if ctx.session().vars().emit_timestamp_notice() {
938            let explanation =
939                self.explain_timestamp(ctx.session(), cluster_id, &id_bundle, determination);
940            ctx.session()
941                .add_notice(AdapterNotice::QueryTimestamp { explanation });
942        }
943
944        let resp = match plan.copy_to {
945            None => resp,
946            Some(format) => ExecuteResponse::CopyTo {
947                format,
948                resp: Box::new(resp),
949            },
950        };
951        Ok(StageResult::Response(resp))
952    }
953
954    #[instrument]
955    async fn peek_copy_to_preflight(
956        &mut self,
957        copy_to: PeekStageCopyTo,
958    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
959        let connection_context = self.connection_context().clone();
960        Ok(StageResult::Handle(mz_ore::task::spawn(
961            || "peek copy to preflight",
962            async {
963                let sinks = &copy_to.global_lir_plan.df_desc().sink_exports;
964                if sinks.len() != 1 {
965                    return Err(AdapterError::Internal(
966                        "expected exactly one copy to s3 sink".into(),
967                    ));
968                }
969                let (sink_id, sink_desc) = sinks
970                    .first_key_value()
971                    .expect("known to be exactly one copy to s3 sink");
972                match &sink_desc.connection {
973                    ComputeSinkConnection::CopyToS3Oneshot(conn) => {
974                        mz_storage_types::sinks::s3_oneshot_sink::preflight(
975                            connection_context,
976                            &conn.aws_connection,
977                            &conn.upload_info,
978                            conn.connection_id,
979                            *sink_id,
980                        )
981                        .await?;
982                        Ok(Box::new(PeekStage::CopyToDataflow(copy_to)))
983                    }
984                    _ => Err(AdapterError::Internal(
985                        "expected copy to s3 oneshot sink".into(),
986                    )),
987                }
988            },
989        )))
990    }
991
992    #[instrument]
993    async fn peek_copy_to_dataflow(
994        &mut self,
995        ctx: &ExecuteContext,
996        PeekStageCopyTo {
997            validity: _,
998            optimizer,
999            global_lir_plan,
1000            optimization_finished_at,
1001            source_ids,
1002        }: PeekStageCopyTo,
1003    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1004        if let Some(id) = ctx.extra.contents() {
1005            self.record_statement_lifecycle_event(
1006                &id,
1007                &StatementLifecycleEvent::OptimizationFinished,
1008                optimization_finished_at,
1009            );
1010        }
1011
1012        let sink_id = global_lir_plan.sink_id();
1013        let cluster_id = optimizer.cluster_id();
1014
1015        let (df_desc, df_meta) = global_lir_plan.unapply();
1016
1017        self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
1018
1019        // Callback for the active copy to.
1020        let (tx, rx) = oneshot::channel();
1021        let active_copy_to = ActiveCopyTo {
1022            conn_id: ctx.session().conn_id().clone(),
1023            tx,
1024            cluster_id,
1025            depends_on: source_ids,
1026        };
1027        // Add metadata for the new COPY TO. CopyTo returns a `ready` future, so it is safe to drop.
1028        drop(
1029            self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
1030                .await,
1031        );
1032
1033        // Ship dataflow.
1034        self.ship_dataflow(df_desc, cluster_id, None).await;
1035
1036        let span = Span::current();
1037        Ok(StageResult::HandleRetire(mz_ore::task::spawn(
1038            || "peek copy to dataflow",
1039            async {
1040                let res = rx.await;
1041                match res {
1042                    Ok(res) => res,
1043                    Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
1044                }
1045            }
1046            .instrument(span),
1047        )))
1048    }
1049
1050    #[instrument]
1051    async fn peek_explain_plan(
1052        &self,
1053        session: &Session,
1054        PeekStageExplainPlan {
1055            optimizer,
1056            insights_ctx,
1057            df_meta,
1058            explain_ctx:
1059                ExplainPlanContext {
1060                    config,
1061                    format,
1062                    stage,
1063                    desc,
1064                    optimizer_trace,
1065                    ..
1066                },
1067            ..
1068        }: PeekStageExplainPlan,
1069    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1070        let desc = desc.expect("RelationDesc for SelectPlan in EXPLAIN mode");
1071
1072        let session_catalog = self.catalog().for_session(session);
1073        let expr_humanizer = {
1074            let transient_items = btreemap! {
1075                optimizer.select_id() => TransientItem::new(
1076                    Some(vec![GlobalId::Explain.to_string()]),
1077                    Some(desc.iter_names().map(|c| c.to_string()).collect()),
1078                )
1079            };
1080            ExprHumanizerExt::new(transient_items, &session_catalog)
1081        };
1082
1083        let finishing = if optimizer.finishing().is_trivial(desc.arity()) {
1084            None
1085        } else {
1086            Some(optimizer.finishing().clone())
1087        };
1088
1089        let target_cluster = self.catalog().get_cluster(optimizer.cluster_id());
1090        let features = optimizer.config().features.clone();
1091
1092        let rows = optimizer_trace
1093            .into_rows(
1094                format,
1095                &config,
1096                &features,
1097                &expr_humanizer,
1098                finishing,
1099                Some(target_cluster),
1100                df_meta,
1101                stage,
1102                plan::ExplaineeStatementKind::Select,
1103                insights_ctx,
1104            )
1105            .await?;
1106
1107        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
1108    }
1109
1110    #[instrument]
1111    async fn peek_explain_pushdown(
1112        &self,
1113        session: &Session,
1114        stage: PeekStageExplainPushdown,
1115    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1116        let as_of = stage.determination.timestamp_context.antichain();
1117        let mz_now = stage
1118            .determination
1119            .timestamp_context
1120            .timestamp()
1121            .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1122            .unwrap_or_else(ResultSpec::value_all);
1123        let fut = self
1124            .render_explain_pushdown_prepare(session, as_of, mz_now, stage.imports)
1125            .await;
1126        let span = Span::current();
1127        Ok(StageResult::HandleRetire(mz_ore::task::spawn(
1128            || "peek explain pushdown",
1129            fut.instrument(span),
1130        )))
1131    }
1132
1133    /// Determines the query timestamp and acquires read holds on dependent sources
1134    /// if necessary.
1135    #[instrument]
1136    pub(super) fn sequence_peek_timestamp(
1137        &mut self,
1138        session: &mut Session,
1139        when: &QueryWhen,
1140        cluster_id: ClusterId,
1141        timeline_context: TimelineContext,
1142        oracle_read_ts: Option<Timestamp>,
1143        source_bundle: &CollectionIdBundle,
1144        source_ids: &BTreeSet<GlobalId>,
1145        real_time_recency_ts: Option<Timestamp>,
1146        requires_linearization: RequireLinearization,
1147    ) -> Result<TimestampDetermination<Timestamp>, AdapterError> {
1148        let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when);
1149        let timedomain_bundle;
1150
1151        // Fetch or generate a timestamp for this query and what the read holds would be if we need to set
1152        // them.
1153        let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
1154            // Use the transaction's timestamp if it exists and this isn't an AS OF query.
1155            Some(
1156                determination @ TimestampDetermination {
1157                    timestamp_context: TimestampContext::TimelineTimestamp { .. },
1158                    ..
1159                },
1160            ) if in_immediate_multi_stmt_txn => (determination, None),
1161            _ => {
1162                let determine_bundle = if in_immediate_multi_stmt_txn {
1163                    // In a transaction, determine a timestamp that will be valid for anything in
1164                    // any schema referenced by the first query.
1165                    timedomain_bundle = self.timedomain_for(
1166                        source_ids,
1167                        &timeline_context,
1168                        session.conn_id(),
1169                        cluster_id,
1170                    )?;
1171
1172                    &timedomain_bundle
1173                } else {
1174                    // If not in a transaction, use the source.
1175                    source_bundle
1176                };
1177                let (determination, read_holds) = self.determine_timestamp(
1178                    session,
1179                    determine_bundle,
1180                    when,
1181                    cluster_id,
1182                    &timeline_context,
1183                    oracle_read_ts,
1184                    real_time_recency_ts,
1185                )?;
1186                // We only need read holds if the read depends on a timestamp.
1187                let read_holds = match determination.timestamp_context.timestamp() {
1188                    Some(_ts) => Some(read_holds),
1189                    None => {
1190                        // We don't need the read holds and shouldn't add them
1191                        // to the txn.
1192                        //
1193                        // TODO: Handle this within determine_timestamp.
1194                        drop(read_holds);
1195                        None
1196                    }
1197                };
1198                (determination, read_holds)
1199            }
1200        };
1201
1202        // Always either verify the current statement ids are within the existing
1203        // transaction's read hold set (timedomain), or create the read holds if this is the
1204        // first statement in a transaction (or this is a single statement transaction).
1205        // This must happen even if this is an `AS OF` query as well. There are steps after
1206        // this that happen off thread, so no matter the kind of statement or transaction,
1207        // we must acquire read holds here so they are held until the off-thread work
1208        // returns to the coordinator.
1209
1210        if let Some(txn_reads) = self.txn_read_holds.get(session.conn_id()) {
1211            // Find referenced ids not in the read hold. A reference could be caused by a
1212            // user specifying an object in a different schema than the first query. An
1213            // index could be caused by a CREATE INDEX after the transaction started.
1214            let allowed_id_bundle = txn_reads.id_bundle();
1215
1216            // We don't need the read holds that determine_timestamp acquired
1217            // for us.
1218            drop(read_holds);
1219
1220            let outside = source_bundle.difference(&allowed_id_bundle);
1221            // Queries without a timestamp and timeline can belong to any existing timedomain.
1222            if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
1223                let valid_names =
1224                    self.resolve_collection_id_bundle_names(session, &allowed_id_bundle);
1225                let invalid_names = self.resolve_collection_id_bundle_names(session, &outside);
1226                return Err(AdapterError::RelationOutsideTimeDomain {
1227                    relations: invalid_names,
1228                    names: valid_names,
1229                });
1230            }
1231        } else if let Some(read_holds) = read_holds {
1232            self.store_transaction_read_holds(session, read_holds);
1233        }
1234
1235        // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
1236        // arbitrary and we don't recall why we did it (possibly an error!). Change this to always
1237        // set the transaction ops. Decide and document what our policy should be on AS OF queries.
1238        // Maybe they shouldn't be allowed in transactions at all because it's hard to explain
1239        // what's going on there. This should probably get a small design document.
1240
1241        // We only track the peeks in the session if the query doesn't use AS
1242        // OF or we're inside an explicit transaction. The latter case is
1243        // necessary to support PG's `BEGIN` semantics, whose behavior can
1244        // depend on whether or not reads have occurred in the txn.
1245        let mut transaction_determination = determination.clone();
1246        if when.is_transactional() {
1247            session.add_transaction_ops(TransactionOps::Peeks {
1248                determination: transaction_determination,
1249                cluster_id,
1250                requires_linearization,
1251            })?;
1252        } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
1253            // If the query uses AS OF, then ignore the timestamp.
1254            transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
1255            session.add_transaction_ops(TransactionOps::Peeks {
1256                determination: transaction_determination,
1257                cluster_id,
1258                requires_linearization,
1259            })?;
1260        };
1261
1262        Ok(determination)
1263    }
1264}