mz_adapter/coord/sequencer/inner/
peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::sync::Arc;
12
13use itertools::Either;
14use mz_adapter_types::dyncfgs::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION;
15use mz_catalog::memory::objects::CatalogItem;
16use mz_compute_types::sinks::ComputeSinkConnection;
17use mz_controller_types::ClusterId;
18use mz_expr::{CollectionPlan, ResultSpec};
19use mz_ore::cast::CastFrom;
20use mz_ore::instrument;
21use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
22use mz_repr::{Datum, GlobalId, Timestamp};
23use mz_sql::ast::{ExplainStage, Statement};
24use mz_sql::catalog::CatalogCluster;
25// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
26use mz_sql::plan::QueryWhen;
27use mz_sql::plan::{self};
28use mz_sql::session::metadata::SessionMetadata;
29use mz_transform::EmptyStatisticsOracle;
30use tokio::sync::oneshot;
31use tracing::warn;
32use tracing::{Instrument, Span};
33
34use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
35use crate::command::ExecuteResponse;
36use crate::coord::id_bundle::CollectionIdBundle;
37use crate::coord::peek::{self, PeekDataflowPlan, PeekPlan, PlannedPeek};
38use crate::coord::sequencer::inner::return_if_err;
39use crate::coord::sequencer::{check_log_reads, emit_optimizer_notices, eval_copy_to_uri};
40use crate::coord::timeline::{TimelineContext, timedomain_for};
41use crate::coord::timestamp_selection::{
42    TimestampContext, TimestampDetermination, TimestampProvider,
43};
44use crate::coord::{
45    Coordinator, CopyToContext, ExecuteContext, ExplainContext, ExplainPlanContext, Message,
46    PeekStage, PeekStageCopyTo, PeekStageExplainPlan, PeekStageExplainPushdown, PeekStageFinish,
47    PeekStageLinearizeTimestamp, PeekStageOptimize, PeekStageRealTimeRecency,
48    PeekStageTimestampReadHold, PlanValidity, StageResult, Staged, TargetCluster, WatchSetResponse,
49};
50use crate::error::AdapterError;
51use crate::explain::insights::PlanInsightsContext;
52use crate::explain::optimizer_trace::OptimizerTrace;
53use crate::notice::AdapterNotice;
54use crate::optimize::{self, Optimize};
55use crate::session::{RequireLinearization, Session, TransactionOps, TransactionStatus};
56use crate::statement_logging::StatementLifecycleEvent;
57
58impl Staged for PeekStage {
59    type Ctx = ExecuteContext;
60
61    fn validity(&mut self) -> &mut PlanValidity {
62        match self {
63            PeekStage::LinearizeTimestamp(stage) => &mut stage.validity,
64            PeekStage::RealTimeRecency(stage) => &mut stage.validity,
65            PeekStage::TimestampReadHold(stage) => &mut stage.validity,
66            PeekStage::Optimize(stage) => &mut stage.validity,
67            PeekStage::Finish(stage) => &mut stage.validity,
68            PeekStage::ExplainPlan(stage) => &mut stage.validity,
69            PeekStage::ExplainPushdown(stage) => &mut stage.validity,
70            PeekStage::CopyToPreflight(stage) => &mut stage.validity,
71            PeekStage::CopyToDataflow(stage) => &mut stage.validity,
72        }
73    }
74
75    async fn stage(
76        self,
77        coord: &mut Coordinator,
78        ctx: &mut ExecuteContext,
79    ) -> Result<StageResult<Box<Self>>, AdapterError> {
80        match self {
81            PeekStage::LinearizeTimestamp(stage) => {
82                coord.peek_linearize_timestamp(ctx.session(), stage).await
83            }
84            PeekStage::RealTimeRecency(stage) => {
85                coord.peek_real_time_recency(ctx.session(), stage).await
86            }
87            PeekStage::TimestampReadHold(stage) => {
88                coord.peek_timestamp_read_hold(ctx.session_mut(), stage)
89            }
90            PeekStage::Optimize(stage) => coord.peek_optimize(ctx.session(), stage).await,
91            PeekStage::Finish(stage) => coord.peek_finish(ctx, stage).await,
92            PeekStage::ExplainPlan(stage) => coord.peek_explain_plan(ctx.session(), stage).await,
93            PeekStage::ExplainPushdown(stage) => {
94                coord.peek_explain_pushdown(ctx.session(), stage).await
95            }
96            PeekStage::CopyToPreflight(stage) => coord.peek_copy_to_preflight(stage).await,
97            PeekStage::CopyToDataflow(stage) => coord.peek_copy_to_dataflow(ctx, stage).await,
98        }
99    }
100
101    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
102        Message::PeekStageReady {
103            ctx,
104            span,
105            stage: self,
106        }
107    }
108
109    fn cancel_enabled(&self) -> bool {
110        true
111    }
112}
113
114impl Coordinator {
115    /// Sequence a peek, determining a timestamp and the most efficient dataflow interaction.
116    ///
117    /// Peeks are sequenced by assigning a timestamp for evaluation, and then determining and
118    /// deploying the most efficient evaluation plan. The peek could evaluate to a constant,
119    /// be a simple read out of an existing arrangement, or required a new dataflow to build
120    /// the results to return.
121    #[instrument]
122    pub(crate) async fn sequence_peek(
123        &mut self,
124        ctx: ExecuteContext,
125        plan: plan::SelectPlan,
126        target_cluster: TargetCluster,
127        max_query_result_size: Option<u64>,
128    ) {
129        let explain_ctx = if ctx.session().vars().emit_plan_insights_notice() {
130            let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
131            ExplainContext::PlanInsightsNotice(optimizer_trace)
132        } else {
133            ExplainContext::None
134        };
135
136        let stage = return_if_err!(
137            self.peek_validate(
138                ctx.session(),
139                plan,
140                target_cluster,
141                None,
142                explain_ctx,
143                max_query_result_size
144            ),
145            ctx
146        );
147        self.sequence_staged(ctx, Span::current(), stage).await;
148    }
149
150    #[instrument]
151    pub(crate) async fn sequence_copy_to(
152        &mut self,
153        ctx: ExecuteContext,
154        plan::CopyToPlan {
155            select_plan,
156            desc,
157            to,
158            connection,
159            connection_id,
160            format,
161            max_file_size,
162        }: plan::CopyToPlan,
163        target_cluster: TargetCluster,
164    ) {
165        let uri = return_if_err!(
166            eval_copy_to_uri(to, ctx.session(), self.catalog().state()),
167            ctx
168        );
169
170        let stage = return_if_err!(
171            self.peek_validate(
172                ctx.session(),
173                select_plan,
174                target_cluster,
175                Some(CopyToContext {
176                    desc,
177                    uri,
178                    connection,
179                    connection_id,
180                    format,
181                    max_file_size,
182                    // This will be set in `peek_stage_validate` stage below.
183                    output_batch_count: None,
184                }),
185                ExplainContext::None,
186                Some(ctx.session().vars().max_query_result_size()),
187            ),
188            ctx
189        );
190        self.sequence_staged(ctx, Span::current(), stage).await;
191    }
192
193    #[instrument]
194    pub(crate) async fn explain_peek(
195        &mut self,
196        ctx: ExecuteContext,
197        plan::ExplainPlanPlan {
198            stage,
199            format,
200            config,
201            explainee,
202        }: plan::ExplainPlanPlan,
203        target_cluster: TargetCluster,
204    ) {
205        let plan::Explainee::Statement(stmt) = explainee else {
206            // This is currently asserted in the `sequence_explain_plan` code that
207            // calls this method.
208            unreachable!()
209        };
210        let plan::ExplaineeStatement::Select { broken, plan, desc } = stmt else {
211            // This is currently asserted in the `sequence_explain_plan` code that
212            // calls this method.
213            unreachable!()
214        };
215
216        // Create an OptimizerTrace instance to collect plans emitted when
217        // executing the optimizer pipeline.
218        let optimizer_trace = OptimizerTrace::new(stage.paths());
219
220        let stage = return_if_err!(
221            self.peek_validate(
222                ctx.session(),
223                plan,
224                target_cluster,
225                None,
226                ExplainContext::Plan(ExplainPlanContext {
227                    broken,
228                    config,
229                    format,
230                    stage,
231                    replan: None,
232                    desc: Some(desc),
233                    optimizer_trace,
234                }),
235                Some(ctx.session().vars().max_query_result_size()),
236            ),
237            ctx
238        );
239        self.sequence_staged(ctx, Span::current(), stage).await;
240    }
241
242    /// Do some simple validation. We must defer most of it until after any off-thread work.
243    #[instrument]
244    pub fn peek_validate(
245        &self,
246        session: &Session,
247        plan: mz_sql::plan::SelectPlan,
248        target_cluster: TargetCluster,
249        copy_to_ctx: Option<CopyToContext>,
250        explain_ctx: ExplainContext,
251        max_query_result_size: Option<u64>,
252    ) -> Result<PeekStage, AdapterError> {
253        // Collect optimizer parameters.
254        let catalog = self.owned_catalog();
255        let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
256        let compute_instance = self
257            .instance_snapshot(cluster.id())
258            .expect("compute instance does not exist");
259        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
260            .override_from(&self.catalog.get_cluster(cluster.id()).config.features())
261            .override_from(&explain_ctx);
262
263        if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
264            return Err(AdapterError::NoClusterReplicasAvailable {
265                name: cluster.name.clone(),
266                is_managed: cluster.is_managed(),
267            });
268        }
269
270        let optimizer = match copy_to_ctx {
271            None => {
272                // Collect optimizer parameters specific to the peek::Optimizer.
273                let (_, view_id) = self.allocate_transient_id();
274                let (_, index_id) = self.allocate_transient_id();
275
276                // Build an optimizer for this SELECT.
277                Either::Left(optimize::peek::Optimizer::new(
278                    Arc::clone(&catalog),
279                    compute_instance,
280                    plan.finishing.clone(),
281                    view_id,
282                    index_id,
283                    optimizer_config,
284                    self.optimizer_metrics(),
285                ))
286            }
287            Some(mut copy_to_ctx) => {
288                // Getting the max worker count across replicas
289                // and using that value for the number of batches to
290                // divide the copy output into.
291                let worker_counts = cluster.replicas().map(|r| {
292                    let loc = &r.config.location;
293                    loc.workers().unwrap_or_else(|| loc.num_processes())
294                });
295                let max_worker_count = match worker_counts.max() {
296                    Some(count) => u64::cast_from(count),
297                    None => {
298                        return Err(AdapterError::NoClusterReplicasAvailable {
299                            name: cluster.name.clone(),
300                            is_managed: cluster.is_managed(),
301                        });
302                    }
303                };
304                copy_to_ctx.output_batch_count = Some(max_worker_count);
305                let (_, view_id) = self.allocate_transient_id();
306                // Build an optimizer for this COPY TO.
307                Either::Right(optimize::copy_to::Optimizer::new(
308                    Arc::clone(&catalog),
309                    compute_instance,
310                    view_id,
311                    copy_to_ctx,
312                    optimizer_config,
313                    self.optimizer_metrics(),
314                ))
315            }
316        };
317
318        let target_replica_name = session.vars().cluster_replica();
319        let mut target_replica = target_replica_name
320            .map(|name| {
321                cluster
322                    .replica_id(name)
323                    .ok_or(AdapterError::UnknownClusterReplica {
324                        cluster_name: cluster.name.clone(),
325                        replica_name: name.to_string(),
326                    })
327            })
328            .transpose()?;
329
330        let source_ids = plan.source.depends_on();
331        let mut timeline_context = self
332            .catalog()
333            .validate_timeline_context(source_ids.iter().copied())?;
334        if matches!(timeline_context, TimelineContext::TimestampIndependent)
335            && plan.source.contains_temporal()?
336        {
337            // If the source IDs are timestamp independent but the query contains temporal functions,
338            // then the timeline context needs to be upgraded to timestamp dependent. This is
339            // required because `source_ids` doesn't contain functions.
340            timeline_context = TimelineContext::TimestampDependent;
341        }
342
343        let notices = check_log_reads(
344            &catalog,
345            cluster,
346            &source_ids,
347            &mut target_replica,
348            session.vars(),
349        )?;
350        session.add_notices(notices);
351
352        let dependencies = source_ids
353            .iter()
354            .map(|id| self.catalog.resolve_item_id(id))
355            .collect();
356        let validity = PlanValidity::new(
357            catalog.transient_revision(),
358            dependencies,
359            Some(cluster.id()),
360            target_replica,
361            session.role_metadata().clone(),
362        );
363
364        Ok(PeekStage::LinearizeTimestamp(PeekStageLinearizeTimestamp {
365            validity,
366            plan,
367            max_query_result_size,
368            source_ids,
369            target_replica,
370            timeline_context,
371            optimizer,
372            explain_ctx,
373        }))
374    }
375
376    /// Possibly linearize a timestamp from a `TimestampOracle`.
377    #[instrument]
378    async fn peek_linearize_timestamp(
379        &self,
380        session: &Session,
381        PeekStageLinearizeTimestamp {
382            validity,
383            source_ids,
384            plan,
385            max_query_result_size,
386            target_replica,
387            timeline_context,
388            optimizer,
389            explain_ctx,
390        }: PeekStageLinearizeTimestamp,
391    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
392        let isolation_level = session.vars().transaction_isolation().clone();
393        let timeline = Coordinator::get_timeline(&timeline_context);
394        let needs_linearized_read_ts =
395            Coordinator::needs_linearized_read_ts(&isolation_level, &plan.when);
396
397        let build_stage = move |oracle_read_ts: Option<Timestamp>| PeekStageRealTimeRecency {
398            validity,
399            plan,
400            max_query_result_size,
401            source_ids,
402            target_replica,
403            timeline_context,
404            oracle_read_ts,
405            optimizer,
406            explain_ctx,
407        };
408
409        match timeline {
410            Some(timeline) if needs_linearized_read_ts => {
411                let oracle = self.get_timestamp_oracle(&timeline);
412
413                // We ship the timestamp oracle off to an async task, so that we
414                // don't block the main task while we wait.
415
416                let span = Span::current();
417                Ok(StageResult::Handle(mz_ore::task::spawn(
418                    || "linearize timestamp",
419                    async move {
420                        let oracle_read_ts = oracle.read_ts().await;
421                        let stage = build_stage(Some(oracle_read_ts));
422                        let stage = PeekStage::RealTimeRecency(stage);
423                        Ok(Box::new(stage))
424                    }
425                    .instrument(span),
426                )))
427            }
428            Some(_) | None => {
429                let stage = build_stage(None);
430                let stage = PeekStage::RealTimeRecency(stage);
431                Ok(StageResult::Immediate(Box::new(stage)))
432            }
433        }
434    }
435
436    /// Determine a read timestamp and create appropriate read holds.
437    #[instrument]
438    fn peek_timestamp_read_hold(
439        &mut self,
440        session: &mut Session,
441        PeekStageTimestampReadHold {
442            mut validity,
443            plan,
444            max_query_result_size,
445            source_ids,
446            target_replica,
447            timeline_context,
448            oracle_read_ts,
449            real_time_recency_ts,
450            optimizer,
451            explain_ctx,
452        }: PeekStageTimestampReadHold,
453    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
454        let cluster_id = match optimizer.as_ref() {
455            Either::Left(optimizer) => optimizer.cluster_id(),
456            Either::Right(optimizer) => optimizer.cluster_id(),
457        };
458        let id_bundle = self
459            .dataflow_builder(cluster_id)
460            .sufficient_collections(source_ids.iter().copied());
461
462        // Although we have added `sources.depends_on()` to the validity already, also add the
463        // sufficient collections for safety.
464        let item_ids = id_bundle
465            .iter()
466            .map(|id| self.catalog().resolve_item_id(&id));
467        validity.extend_dependencies(item_ids);
468
469        let determination = self.sequence_peek_timestamp(
470            session,
471            &plan.when,
472            cluster_id,
473            timeline_context,
474            oracle_read_ts,
475            &id_bundle,
476            &source_ids,
477            real_time_recency_ts,
478            (&explain_ctx).into(),
479        )?;
480
481        let stage = PeekStage::Optimize(PeekStageOptimize {
482            validity,
483            plan,
484            max_query_result_size,
485            source_ids,
486            id_bundle,
487            target_replica,
488            determination,
489            optimizer,
490            explain_ctx,
491        });
492        Ok(StageResult::Immediate(Box::new(stage)))
493    }
494
495    #[instrument]
496    async fn peek_optimize(
497        &self,
498        session: &Session,
499        PeekStageOptimize {
500            validity,
501            plan,
502            max_query_result_size,
503            source_ids,
504            id_bundle,
505            target_replica,
506            determination,
507            mut optimizer,
508            explain_ctx,
509        }: PeekStageOptimize,
510    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
511        // Generate data structures that can be moved to another task where we will perform possibly
512        // expensive optimizations.
513        let timestamp_context = determination.timestamp_context.clone();
514        let stats = self
515            .statistics_oracle(session, &source_ids, &timestamp_context.antichain(), true)
516            .await
517            .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
518        let session = session.meta();
519        let now = self.catalog().config().now.clone();
520        let catalog = self.owned_catalog();
521        let mut compute_instances = BTreeMap::new();
522        if explain_ctx.needs_plan_insights() {
523            // There's a chance for index skew (indexes were created/deleted between stages) from the
524            // original plan, but that seems acceptable for insights.
525            for cluster in self.catalog().user_clusters() {
526                let snapshot = self.instance_snapshot(cluster.id).expect("must exist");
527                compute_instances.insert(cluster.name.clone(), snapshot);
528            }
529        }
530
531        let span = Span::current();
532        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
533            || "optimize peek",
534            move || {
535                span.in_scope(|| {
536                    let pipeline = || -> Result<Either<optimize::peek::GlobalLirPlan, optimize::copy_to::GlobalLirPlan>, AdapterError> {
537                        let _dispatch_guard = explain_ctx.dispatch_guard();
538
539                        let raw_expr = plan.source.clone();
540
541                        match optimizer.as_mut() {
542                            // Optimize SELECT statement.
543                            Either::Left(optimizer) => {
544                                // HIR ⇒ MIR lowering and MIR optimization (local)
545                                let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
546                                // Attach resolved context required to continue the pipeline.
547                                let local_mir_plan = local_mir_plan.resolve(timestamp_context.clone(), &session, stats);
548                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
549                                let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
550
551                                Ok(Either::Left(global_lir_plan))
552                            }
553                            // Optimize COPY TO statement.
554                            Either::Right(optimizer) => {
555                                // HIR ⇒ MIR lowering and MIR optimization (local)
556                                let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
557                                // Attach resolved context required to continue the pipeline.
558                                let local_mir_plan = local_mir_plan.resolve(timestamp_context.clone(), &session, stats);
559                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
560                                let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
561
562                                Ok(Either::Right(global_lir_plan))
563                            }
564                        }
565                    };
566
567                    let pipeline_result = pipeline();
568                    let optimization_finished_at = now();
569
570                    let stage = match pipeline_result {
571                        Ok(Either::Left(global_lir_plan)) => {
572                            let optimizer = optimizer.unwrap_left();
573                            // Enable fast path cluster calculation for slow path plans.
574                            let needs_plan_insights = explain_ctx.needs_plan_insights();
575                            // Disable anything that uses the optimizer if we only want the notice and
576                            // plan optimization took longer than the threshold. This is to prevent a
577                            // situation where optimizing takes a while and there a lots of clusters,
578                            // which would delay peek execution by the product of those.
579                            let opt_limit = PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
580                                .get(catalog.system_config().dyncfgs());
581                            let target_instance = catalog
582                                .get_cluster(optimizer.cluster_id())
583                                .name
584                                .clone();
585                            let enable_re_optimize =
586                                !(matches!(explain_ctx, ExplainContext::PlanInsightsNotice(_))
587                                    && optimizer.duration() > opt_limit);
588                            let insights_ctx = needs_plan_insights.then(|| PlanInsightsContext {
589                                stmt: plan.select.as_deref().map(Clone::clone).map(Statement::Select),
590                                raw_expr: plan.source.clone(),
591                                catalog,
592                                compute_instances,
593                                target_instance,
594                                metrics: optimizer.metrics().clone(),
595                                finishing: optimizer.finishing().clone(),
596                                optimizer_config: optimizer.config().clone(),
597                                session,
598                                timestamp_context,
599                                view_id: optimizer.select_id(),
600                                index_id: optimizer.index_id(),
601                                enable_re_optimize,
602                            }).map(Box::new);
603                            match explain_ctx {
604                                ExplainContext::Plan(explain_ctx) => {
605                                    let (_, df_meta, _) = global_lir_plan.unapply();
606                                    PeekStage::ExplainPlan(PeekStageExplainPlan {
607                                        validity,
608                                        optimizer,
609                                        df_meta,
610                                        explain_ctx,
611                                        insights_ctx,
612                                    })
613                                }
614                                ExplainContext::PlanInsightsNotice(optimizer_trace) => {
615                                    PeekStage::Finish(PeekStageFinish {
616                                        validity,
617                                        plan,
618                                        max_query_result_size,
619                                        id_bundle,
620                                        target_replica,
621                                        source_ids,
622                                        determination,
623                                        cluster_id: optimizer.cluster_id(),
624                                        finishing: optimizer.finishing().clone(),
625                                        plan_insights_optimizer_trace: Some(optimizer_trace),
626                                        global_lir_plan,
627                                        optimization_finished_at,
628                                        insights_ctx,
629                                    })
630                                }
631                                ExplainContext::None => PeekStage::Finish(PeekStageFinish {
632                                    validity,
633                                    plan,
634                                    max_query_result_size,
635                                    id_bundle,
636                                    target_replica,
637                                    source_ids,
638                                    determination,
639                                    cluster_id: optimizer.cluster_id(),
640                                    finishing: optimizer.finishing().clone(),
641                                    plan_insights_optimizer_trace: None,
642                                    global_lir_plan,
643                                    optimization_finished_at,
644                                    insights_ctx,
645                                }),
646                                ExplainContext::Pushdown => {
647                                    let (plan, _, _) = global_lir_plan.unapply();
648                                    let imports = match plan {
649                                        PeekPlan::SlowPath(plan) => plan
650                                            .desc
651                                            .source_imports
652                                            .into_iter()
653                                            .filter_map(|(id, (desc, _, _upper))| {
654                                                desc.arguments.operators.map(|mfp| (id, mfp))
655                                            })
656                                            .collect(),
657                                        PeekPlan::FastPath(_) => BTreeMap::default(),
658                                    };
659                                    PeekStage::ExplainPushdown(PeekStageExplainPushdown {
660                                        validity,
661                                        determination,
662                                        imports,
663                                    })
664                                }
665                            }
666                        }
667                        Ok(Either::Right(global_lir_plan)) => {
668                            let optimizer = optimizer.unwrap_right();
669                            PeekStage::CopyToPreflight(PeekStageCopyTo {
670                                validity,
671                                optimizer,
672                                global_lir_plan,
673                                optimization_finished_at,
674                                source_ids,
675                            })
676                        }
677                        // Internal optimizer errors are handled differently
678                        // depending on the caller.
679                        Err(err) => {
680                            let Some(optimizer) = optimizer.left() else {
681                                // In `COPY TO` contexts, immediately retire the
682                                // execution with the error.
683                                return Err(err);
684                            };
685                            let ExplainContext::Plan(explain_ctx) = explain_ctx else {
686                                // In `sequence_~` contexts, immediately retire the
687                                // execution with the error.
688                                return Err(err);
689                            };
690
691                            if explain_ctx.broken {
692                                // In `EXPLAIN BROKEN` contexts, just log the error
693                                // and move to the next stage with default
694                                // parameters.
695                                tracing::error!("error while handling EXPLAIN statement: {}", err);
696                                PeekStage::ExplainPlan(PeekStageExplainPlan {
697                                    validity,
698                                    optimizer,
699                                    df_meta: Default::default(),
700                                    explain_ctx,
701                                    insights_ctx: None,
702                                })
703                            } else {
704                                // In regular `EXPLAIN` contexts, immediately retire
705                                // the execution with the error.
706                                return Err(err);
707                            }
708                        }
709                    };
710                    Ok(Box::new(stage))
711                })
712            },
713        )))
714    }
715
716    #[instrument]
717    async fn peek_real_time_recency(
718        &self,
719        session: &Session,
720        PeekStageRealTimeRecency {
721            validity,
722            plan,
723            max_query_result_size,
724            source_ids,
725            target_replica,
726            timeline_context,
727            oracle_read_ts,
728            optimizer,
729            explain_ctx,
730        }: PeekStageRealTimeRecency,
731    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
732        let fut = self
733            .determine_real_time_recent_timestamp_if_needed(session, source_ids.iter().copied())
734            .await?;
735
736        match fut {
737            Some(fut) => {
738                let span = Span::current();
739                Ok(StageResult::Handle(mz_ore::task::spawn(
740                    || "peek real time recency",
741                    async move {
742                        let real_time_recency_ts = fut.await?;
743                        let stage = PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
744                            validity,
745                            plan,
746                            max_query_result_size,
747                            target_replica,
748                            timeline_context,
749                            source_ids,
750                            optimizer,
751                            explain_ctx,
752                            oracle_read_ts,
753                            real_time_recency_ts: Some(real_time_recency_ts),
754                        });
755                        Ok(Box::new(stage))
756                    }
757                    .instrument(span),
758                )))
759            }
760            None => Ok(StageResult::Immediate(Box::new(
761                PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
762                    validity,
763                    plan,
764                    max_query_result_size,
765                    target_replica,
766                    timeline_context,
767                    source_ids,
768                    optimizer,
769                    explain_ctx,
770                    oracle_read_ts,
771                    real_time_recency_ts: None,
772                }),
773            ))),
774        }
775    }
776
777    #[instrument]
778    async fn peek_finish(
779        &mut self,
780        ctx: &mut ExecuteContext,
781        PeekStageFinish {
782            validity: _,
783            plan,
784            max_query_result_size,
785            id_bundle,
786            target_replica,
787            source_ids,
788            determination,
789            cluster_id,
790            finishing,
791            plan_insights_optimizer_trace,
792            global_lir_plan,
793            optimization_finished_at,
794            insights_ctx,
795        }: PeekStageFinish,
796    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
797        if let Some(id) = ctx.extra.contents() {
798            self.record_statement_lifecycle_event(
799                &id,
800                &StatementLifecycleEvent::OptimizationFinished,
801                optimization_finished_at,
802            );
803        }
804
805        let session = ctx.session_mut();
806        let conn_id = session.conn_id().clone();
807
808        let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
809        let source_arity = typ.arity();
810
811        emit_optimizer_notices(&*self.catalog, &*session, &df_meta.optimizer_notices);
812
813        if let Some(trace) = plan_insights_optimizer_trace {
814            let target_cluster = self.catalog().get_cluster(cluster_id);
815            let features = OptimizerFeatures::from(self.catalog().system_config())
816                .override_from(&target_cluster.config.features());
817            let insights = trace
818                .into_plan_insights(
819                    &features,
820                    &self.catalog().for_session(session),
821                    Some(plan.finishing),
822                    Some(target_cluster),
823                    df_meta,
824                    insights_ctx,
825                )
826                .await?;
827            session.add_notice(AdapterNotice::PlanInsights(insights));
828        }
829
830        let planned_peek = PlannedPeek {
831            plan: peek_plan,
832            determination: determination.clone(),
833            conn_id: conn_id.clone(),
834            intermediate_result_type: typ,
835            source_arity,
836            source_ids,
837        };
838
839        if let Some(transient_index_id) = match &planned_peek.plan {
840            peek::PeekPlan::FastPath(_) => None,
841            peek::PeekPlan::SlowPath(PeekDataflowPlan { id, .. }) => Some(id),
842        } {
843            if let Some(statement_logging_id) = ctx.extra.contents() {
844                self.set_transient_index_id(statement_logging_id, *transient_index_id);
845            }
846        }
847
848        if let Some(uuid) = ctx.extra().contents() {
849            let ts = determination.timestamp_context.timestamp_or_default();
850            let mut transitive_storage_deps = BTreeSet::new();
851            let mut transitive_compute_deps = BTreeSet::new();
852            for item_id in id_bundle
853                .iter()
854                .map(|gid| self.catalog.state().get_entry_by_global_id(&gid).id())
855                .flat_map(|id| self.catalog.state().transitive_uses(id))
856            {
857                let entry = self.catalog.state().get_entry(&item_id);
858                match entry.item() {
859                    // TODO(alter_table): Adding all of the GlobalIds for an object is incorrect.
860                    // For example, this peek may depend on just a single version of a table, but
861                    // we would add dependencies on all versions of said table. Doing this is okay
862                    // for now since we can't yet version tables, but should get fixed.
863                    CatalogItem::Table(_) | CatalogItem::Source(_) => {
864                        transitive_storage_deps.extend(entry.global_ids());
865                    }
866                    // Each catalog item is computed by at most one compute collection at a time,
867                    // which is also the most recent one.
868                    CatalogItem::MaterializedView(_) | CatalogItem::Index(_) => {
869                        transitive_compute_deps.insert(entry.latest_global_id());
870                    }
871                    _ => {}
872                }
873            }
874            self.install_storage_watch_set(
875                conn_id.clone(),
876                transitive_storage_deps,
877                ts,
878                WatchSetResponse::StatementDependenciesReady(
879                    uuid,
880                    StatementLifecycleEvent::StorageDependenciesFinished,
881                ),
882            );
883            self.install_compute_watch_set(
884                conn_id,
885                transitive_compute_deps,
886                ts,
887                WatchSetResponse::StatementDependenciesReady(
888                    uuid,
889                    StatementLifecycleEvent::ComputeDependenciesFinished,
890                ),
891            )
892        }
893
894        let max_result_size = self.catalog().system_config().max_result_size();
895
896        // Implement the peek, and capture the response.
897        let resp = self
898            .implement_peek_plan(
899                ctx.extra_mut(),
900                planned_peek,
901                finishing,
902                cluster_id,
903                target_replica,
904                max_result_size,
905                max_query_result_size,
906            )
907            .await?;
908
909        if ctx.session().vars().emit_timestamp_notice() {
910            let explanation =
911                self.explain_timestamp(ctx.session(), cluster_id, &id_bundle, determination);
912            ctx.session()
913                .add_notice(AdapterNotice::QueryTimestamp { explanation });
914        }
915
916        let resp = match plan.copy_to {
917            None => resp,
918            Some(format) => ExecuteResponse::CopyTo {
919                format,
920                resp: Box::new(resp),
921            },
922        };
923        Ok(StageResult::Response(resp))
924    }
925
926    #[instrument]
927    async fn peek_copy_to_preflight(
928        &self,
929        copy_to: PeekStageCopyTo,
930    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
931        let connection_context = self.connection_context().clone();
932        Ok(StageResult::Handle(mz_ore::task::spawn(
933            || "peek copy to preflight",
934            async {
935                let sinks = &copy_to.global_lir_plan.df_desc().sink_exports;
936                if sinks.len() != 1 {
937                    return Err(AdapterError::Internal(
938                        "expected exactly one copy to s3 sink".into(),
939                    ));
940                }
941                let (sink_id, sink_desc) = sinks
942                    .first_key_value()
943                    .expect("known to be exactly one copy to s3 sink");
944                match &sink_desc.connection {
945                    ComputeSinkConnection::CopyToS3Oneshot(conn) => {
946                        mz_storage_types::sinks::s3_oneshot_sink::preflight(
947                            connection_context,
948                            &conn.aws_connection,
949                            &conn.upload_info,
950                            conn.connection_id,
951                            *sink_id,
952                        )
953                        .await?;
954                        Ok(Box::new(PeekStage::CopyToDataflow(copy_to)))
955                    }
956                    _ => Err(AdapterError::Internal(
957                        "expected copy to s3 oneshot sink".into(),
958                    )),
959                }
960            },
961        )))
962    }
963
964    #[instrument]
965    async fn peek_copy_to_dataflow(
966        &mut self,
967        ctx: &ExecuteContext,
968        PeekStageCopyTo {
969            validity: _,
970            optimizer,
971            global_lir_plan,
972            optimization_finished_at,
973            source_ids,
974        }: PeekStageCopyTo,
975    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
976        if let Some(id) = ctx.extra.contents() {
977            self.record_statement_lifecycle_event(
978                &id,
979                &StatementLifecycleEvent::OptimizationFinished,
980                optimization_finished_at,
981            );
982        }
983
984        let sink_id = global_lir_plan.sink_id();
985        let cluster_id = optimizer.cluster_id();
986
987        let (df_desc, df_meta) = global_lir_plan.unapply();
988
989        emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
990
991        // Callback for the active copy to.
992        let (tx, rx) = oneshot::channel();
993        let active_copy_to = ActiveCopyTo {
994            conn_id: ctx.session().conn_id().clone(),
995            tx,
996            cluster_id,
997            depends_on: source_ids,
998        };
999        // Add metadata for the new COPY TO. CopyTo returns a `ready` future, so it is safe to drop.
1000        drop(
1001            self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
1002                .await,
1003        );
1004
1005        // Ship dataflow.
1006        self.ship_dataflow(df_desc, cluster_id, None).await;
1007
1008        let span = Span::current();
1009        Ok(StageResult::HandleRetire(mz_ore::task::spawn(
1010            || "peek copy to dataflow",
1011            async {
1012                let res = rx.await;
1013                match res {
1014                    Ok(res) => res,
1015                    Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
1016                }
1017            }
1018            .instrument(span),
1019        )))
1020    }
1021
1022    #[instrument]
1023    async fn peek_explain_plan(
1024        &self,
1025        session: &Session,
1026        PeekStageExplainPlan {
1027            optimizer,
1028            insights_ctx,
1029            df_meta,
1030            explain_ctx,
1031            ..
1032        }: PeekStageExplainPlan,
1033    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1034        let rows = super::super::explain_plan_inner(
1035            session,
1036            self.catalog(),
1037            df_meta,
1038            explain_ctx,
1039            optimizer,
1040            insights_ctx,
1041        )
1042        .await?;
1043
1044        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
1045    }
1046
1047    #[instrument]
1048    async fn peek_explain_pushdown(
1049        &self,
1050        session: &Session,
1051        stage: PeekStageExplainPushdown,
1052    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1053        let as_of = stage.determination.timestamp_context.antichain();
1054        let mz_now = stage
1055            .determination
1056            .timestamp_context
1057            .timestamp()
1058            .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1059            .unwrap_or_else(ResultSpec::value_all);
1060        let fut = self
1061            .explain_pushdown_future(session, as_of, mz_now, stage.imports)
1062            .await;
1063        let span = Span::current();
1064        Ok(StageResult::HandleRetire(mz_ore::task::spawn(
1065            || "peek explain pushdown",
1066            fut.instrument(span),
1067        )))
1068    }
1069
1070    /// Determines the query timestamp and acquires read holds on dependent sources
1071    /// if necessary.
1072    #[instrument]
1073    pub(super) fn sequence_peek_timestamp(
1074        &mut self,
1075        session: &mut Session,
1076        when: &QueryWhen,
1077        cluster_id: ClusterId,
1078        timeline_context: TimelineContext,
1079        oracle_read_ts: Option<Timestamp>,
1080        source_bundle: &CollectionIdBundle,
1081        source_ids: &BTreeSet<GlobalId>,
1082        real_time_recency_ts: Option<Timestamp>,
1083        requires_linearization: RequireLinearization,
1084    ) -> Result<TimestampDetermination<Timestamp>, AdapterError> {
1085        let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when);
1086        let timedomain_bundle;
1087
1088        // Fetch or generate a timestamp for this query and what the read holds would be if we need to set
1089        // them.
1090        let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
1091            // Use the transaction's timestamp if it exists and this isn't an AS OF query.
1092            Some(
1093                determination @ TimestampDetermination {
1094                    timestamp_context: TimestampContext::TimelineTimestamp { .. },
1095                    ..
1096                },
1097            ) if in_immediate_multi_stmt_txn => (determination, None),
1098            _ => {
1099                let determine_bundle = if in_immediate_multi_stmt_txn {
1100                    // In a transaction, determine a timestamp that will be valid for anything in
1101                    // any schema referenced by the first query.
1102                    timedomain_bundle = timedomain_for(
1103                        self.catalog(),
1104                        &self.index_oracle(cluster_id),
1105                        source_ids,
1106                        &timeline_context,
1107                        session.conn_id(),
1108                        cluster_id,
1109                    )?;
1110
1111                    &timedomain_bundle
1112                } else {
1113                    // If not in a transaction, use the source.
1114                    source_bundle
1115                };
1116                let (determination, read_holds) = self.determine_timestamp(
1117                    session,
1118                    determine_bundle,
1119                    when,
1120                    cluster_id,
1121                    &timeline_context,
1122                    oracle_read_ts,
1123                    real_time_recency_ts,
1124                )?;
1125                // We only need read holds if the read depends on a timestamp.
1126                let read_holds = match determination.timestamp_context.timestamp() {
1127                    Some(_ts) => Some(read_holds),
1128                    None => {
1129                        // We don't need the read holds and shouldn't add them
1130                        // to the txn.
1131                        //
1132                        // TODO: Handle this within determine_timestamp.
1133                        drop(read_holds);
1134                        None
1135                    }
1136                };
1137                (determination, read_holds)
1138            }
1139        };
1140
1141        // Always either verify the current statement ids are within the existing
1142        // transaction's read hold set (timedomain), or create the read holds if this is the
1143        // first statement in a transaction (or this is a single statement transaction).
1144        // This must happen even if this is an `AS OF` query as well. There are steps after
1145        // this that happen off thread, so no matter the kind of statement or transaction,
1146        // we must acquire read holds here so they are held until the off-thread work
1147        // returns to the coordinator.
1148
1149        if let Some(txn_reads) = self.txn_read_holds.get(session.conn_id()) {
1150            // Find referenced ids not in the read hold. A reference could be caused by a
1151            // user specifying an object in a different schema than the first query. An
1152            // index could be caused by a CREATE INDEX after the transaction started.
1153            let allowed_id_bundle = txn_reads.id_bundle();
1154
1155            // We don't need the read holds that determine_timestamp acquired
1156            // for us.
1157            drop(read_holds);
1158
1159            let outside = source_bundle.difference(&allowed_id_bundle);
1160            // Queries without a timestamp and timeline can belong to any existing timedomain.
1161            if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
1162                let valid_names =
1163                    allowed_id_bundle.resolve_names(self.catalog(), session.conn_id());
1164                let invalid_names = outside.resolve_names(self.catalog(), session.conn_id());
1165                return Err(AdapterError::RelationOutsideTimeDomain {
1166                    relations: invalid_names,
1167                    names: valid_names,
1168                });
1169            }
1170        } else if let Some(read_holds) = read_holds {
1171            self.store_transaction_read_holds(session.conn_id().clone(), read_holds);
1172        }
1173
1174        // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
1175        // arbitrary and we don't recall why we did it (possibly an error!). Change this to always
1176        // set the transaction ops. Decide and document what our policy should be on AS OF queries.
1177        // Maybe they shouldn't be allowed in transactions at all because it's hard to explain
1178        // what's going on there. This should probably get a small design document.
1179
1180        // We only track the peeks in the session if the query doesn't use AS
1181        // OF or we're inside an explicit transaction. The latter case is
1182        // necessary to support PG's `BEGIN` semantics, whose behavior can
1183        // depend on whether or not reads have occurred in the txn.
1184        let mut transaction_determination = determination.clone();
1185        if when.is_transactional() {
1186            session.add_transaction_ops(TransactionOps::Peeks {
1187                determination: transaction_determination,
1188                cluster_id,
1189                requires_linearization,
1190            })?;
1191        } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
1192            // If the query uses AS OF, then ignore the timestamp.
1193            transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
1194            session.add_transaction_ops(TransactionOps::Peeks {
1195                determination: transaction_determination,
1196                cluster_id,
1197                requires_linearization,
1198            })?;
1199        };
1200
1201        Ok(determination)
1202    }
1203}