Skip to main content

mz_adapter/coord/sequencer/inner/
peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::sync::Arc;
12
13use itertools::Either;
14use mz_adapter_types::dyncfgs::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION;
15use mz_compute_types::sinks::ComputeSinkConnection;
16use mz_controller_types::ClusterId;
17use mz_expr::{CollectionPlan, ResultSpec};
18use mz_ore::cast::CastFrom;
19use mz_ore::instrument;
20use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
21use mz_repr::{Datum, GlobalId, Timestamp};
22use mz_sql::ast::{ExplainStage, Statement};
23use mz_sql::catalog::CatalogCluster;
24// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
25use mz_sql::plan::QueryWhen;
26use mz_sql::plan::{self};
27use mz_sql::session::metadata::SessionMetadata;
28use mz_transform::EmptyStatisticsOracle;
29use tokio::sync::oneshot;
30use tracing::warn;
31use tracing::{Instrument, Span};
32
33use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
34use crate::command::ExecuteResponse;
35use crate::coord::id_bundle::CollectionIdBundle;
36use crate::coord::peek::{self, PeekDataflowPlan, PeekPlan, PlannedPeek};
37use crate::coord::sequencer::inner::return_if_err;
38use crate::coord::sequencer::{check_log_reads, emit_optimizer_notices, eval_copy_to_uri};
39use crate::coord::timeline::{TimelineContext, timedomain_for};
40use crate::coord::timestamp_selection::{
41    TimestampContext, TimestampDetermination, TimestampProvider,
42};
43use crate::coord::{
44    Coordinator, CopyToContext, ExecuteContext, ExplainContext, ExplainPlanContext, Message,
45    PeekStage, PeekStageCopyTo, PeekStageExplainPlan, PeekStageExplainPushdown, PeekStageFinish,
46    PeekStageLinearizeTimestamp, PeekStageOptimize, PeekStageRealTimeRecency,
47    PeekStageTimestampReadHold, PlanValidity, StageResult, Staged, TargetCluster,
48};
49use crate::error::AdapterError;
50use crate::explain::insights::PlanInsightsContext;
51use crate::explain::optimizer_trace::OptimizerTrace;
52use crate::notice::AdapterNotice;
53use crate::optimize::{self, Optimize};
54use crate::session::{RequireLinearization, Session, TransactionOps, TransactionStatus};
55use crate::statement_logging::StatementLifecycleEvent;
56use crate::statement_logging::WatchSetCreation;
57
58impl Staged for PeekStage {
59    type Ctx = ExecuteContext;
60
61    fn validity(&mut self) -> &mut PlanValidity {
62        match self {
63            PeekStage::LinearizeTimestamp(stage) => &mut stage.validity,
64            PeekStage::RealTimeRecency(stage) => &mut stage.validity,
65            PeekStage::TimestampReadHold(stage) => &mut stage.validity,
66            PeekStage::Optimize(stage) => &mut stage.validity,
67            PeekStage::Finish(stage) => &mut stage.validity,
68            PeekStage::ExplainPlan(stage) => &mut stage.validity,
69            PeekStage::ExplainPushdown(stage) => &mut stage.validity,
70            PeekStage::CopyToPreflight(stage) => &mut stage.validity,
71            PeekStage::CopyToDataflow(stage) => &mut stage.validity,
72        }
73    }
74
75    async fn stage(
76        self,
77        coord: &mut Coordinator,
78        ctx: &mut ExecuteContext,
79    ) -> Result<StageResult<Box<Self>>, AdapterError> {
80        match self {
81            PeekStage::LinearizeTimestamp(stage) => {
82                coord.peek_linearize_timestamp(ctx.session(), stage).await
83            }
84            PeekStage::RealTimeRecency(stage) => {
85                coord.peek_real_time_recency(ctx.session(), stage).await
86            }
87            PeekStage::TimestampReadHold(stage) => {
88                coord.peek_timestamp_read_hold(ctx.session_mut(), stage)
89            }
90            PeekStage::Optimize(stage) => coord.peek_optimize(ctx.session(), stage).await,
91            PeekStage::Finish(stage) => coord.peek_finish(ctx, stage).await,
92            PeekStage::ExplainPlan(stage) => coord.peek_explain_plan(ctx.session(), stage).await,
93            PeekStage::ExplainPushdown(stage) => {
94                coord.peek_explain_pushdown(ctx.session(), stage).await
95            }
96            PeekStage::CopyToPreflight(stage) => coord.peek_copy_to_preflight(stage).await,
97            PeekStage::CopyToDataflow(stage) => coord.peek_copy_to_dataflow(ctx, stage).await,
98        }
99    }
100
101    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
102        Message::PeekStageReady {
103            ctx,
104            span,
105            stage: self,
106        }
107    }
108
109    fn cancel_enabled(&self) -> bool {
110        true
111    }
112}
113
114impl Coordinator {
115    /// Sequence a peek, determining a timestamp and the most efficient dataflow interaction.
116    ///
117    /// Peeks are sequenced by assigning a timestamp for evaluation, and then determining and
118    /// deploying the most efficient evaluation plan. The peek could evaluate to a constant,
119    /// be a simple read out of an existing arrangement, or required a new dataflow to build
120    /// the results to return.
121    #[instrument]
122    pub(crate) async fn sequence_peek(
123        &mut self,
124        ctx: ExecuteContext,
125        plan: plan::SelectPlan,
126        target_cluster: TargetCluster,
127        max_query_result_size: Option<u64>,
128    ) {
129        let explain_ctx = if ctx.session().vars().emit_plan_insights_notice() {
130            let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
131            ExplainContext::PlanInsightsNotice(optimizer_trace)
132        } else {
133            ExplainContext::None
134        };
135
136        let stage = return_if_err!(
137            self.peek_validate(
138                ctx.session(),
139                plan,
140                target_cluster,
141                None,
142                explain_ctx,
143                max_query_result_size
144            ),
145            ctx
146        );
147        self.sequence_staged(ctx, Span::current(), stage).await;
148    }
149
150    #[instrument]
151    pub(crate) async fn sequence_copy_to(
152        &mut self,
153        ctx: ExecuteContext,
154        plan::CopyToPlan {
155            select_plan,
156            desc,
157            to,
158            connection,
159            connection_id,
160            format,
161            max_file_size,
162        }: plan::CopyToPlan,
163        target_cluster: TargetCluster,
164    ) {
165        let uri = return_if_err!(
166            eval_copy_to_uri(to, ctx.session(), self.catalog().state()),
167            ctx
168        );
169
170        let stage = return_if_err!(
171            self.peek_validate(
172                ctx.session(),
173                select_plan,
174                target_cluster,
175                Some(CopyToContext {
176                    desc,
177                    uri,
178                    connection,
179                    connection_id,
180                    format,
181                    max_file_size,
182                    // This will be set in `peek_stage_validate` stage below.
183                    output_batch_count: None,
184                }),
185                ExplainContext::None,
186                Some(ctx.session().vars().max_query_result_size()),
187            ),
188            ctx
189        );
190        self.sequence_staged(ctx, Span::current(), stage).await;
191    }
192
193    #[instrument]
194    pub(crate) async fn explain_peek(
195        &mut self,
196        ctx: ExecuteContext,
197        plan::ExplainPlanPlan {
198            stage,
199            format,
200            config,
201            explainee,
202        }: plan::ExplainPlanPlan,
203        target_cluster: TargetCluster,
204    ) {
205        let plan::Explainee::Statement(stmt) = explainee else {
206            // This is currently asserted in the `sequence_explain_plan` code that
207            // calls this method.
208            unreachable!()
209        };
210        let plan::ExplaineeStatement::Select { broken, plan, desc } = stmt else {
211            // This is currently asserted in the `sequence_explain_plan` code that
212            // calls this method.
213            unreachable!()
214        };
215
216        // Create an OptimizerTrace instance to collect plans emitted when
217        // executing the optimizer pipeline.
218        let optimizer_trace = OptimizerTrace::new(stage.paths());
219
220        let stage = return_if_err!(
221            self.peek_validate(
222                ctx.session(),
223                plan,
224                target_cluster,
225                None,
226                ExplainContext::Plan(ExplainPlanContext {
227                    broken,
228                    config,
229                    format,
230                    stage,
231                    replan: None,
232                    desc: Some(desc),
233                    optimizer_trace,
234                }),
235                Some(ctx.session().vars().max_query_result_size()),
236            ),
237            ctx
238        );
239        self.sequence_staged(ctx, Span::current(), stage).await;
240    }
241
242    /// Do some simple validation. We must defer most of it until after any off-thread work.
243    #[instrument]
244    pub fn peek_validate(
245        &self,
246        session: &Session,
247        plan: mz_sql::plan::SelectPlan,
248        target_cluster: TargetCluster,
249        copy_to_ctx: Option<CopyToContext>,
250        explain_ctx: ExplainContext,
251        max_query_result_size: Option<u64>,
252    ) -> Result<PeekStage, AdapterError> {
253        // Collect optimizer parameters.
254        let catalog = self.owned_catalog();
255        let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
256        let compute_instance = self
257            .instance_snapshot(cluster.id())
258            .expect("compute instance does not exist");
259        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
260            .override_from(&self.catalog.get_cluster(cluster.id()).config.features())
261            .override_from(&explain_ctx);
262
263        if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
264            return Err(AdapterError::NoClusterReplicasAvailable {
265                name: cluster.name.clone(),
266                is_managed: cluster.is_managed(),
267            });
268        }
269
270        let optimizer = match copy_to_ctx {
271            None => {
272                // Collect optimizer parameters specific to the peek::Optimizer.
273                let (_, view_id) = self.allocate_transient_id();
274                let (_, index_id) = self.allocate_transient_id();
275
276                // Build an optimizer for this SELECT.
277                Either::Left(optimize::peek::Optimizer::new(
278                    Arc::clone(&catalog),
279                    compute_instance,
280                    plan.finishing.clone(),
281                    view_id,
282                    index_id,
283                    optimizer_config,
284                    self.optimizer_metrics(),
285                ))
286            }
287            Some(mut copy_to_ctx) => {
288                // Getting the max worker count across replicas
289                // and using that value for the number of batches to
290                // divide the copy output into.
291                let worker_counts = cluster.replicas().map(|r| {
292                    let loc = &r.config.location;
293                    loc.workers().unwrap_or_else(|| loc.num_processes())
294                });
295                let max_worker_count = match worker_counts.max() {
296                    Some(count) => u64::cast_from(count),
297                    None => {
298                        return Err(AdapterError::NoClusterReplicasAvailable {
299                            name: cluster.name.clone(),
300                            is_managed: cluster.is_managed(),
301                        });
302                    }
303                };
304                copy_to_ctx.output_batch_count = Some(max_worker_count);
305                let (_, view_id) = self.allocate_transient_id();
306                // Build an optimizer for this COPY TO.
307                Either::Right(optimize::copy_to::Optimizer::new(
308                    Arc::clone(&catalog),
309                    compute_instance,
310                    view_id,
311                    copy_to_ctx,
312                    optimizer_config,
313                    self.optimizer_metrics(),
314                ))
315            }
316        };
317
318        let target_replica_name = session.vars().cluster_replica();
319        let mut target_replica = target_replica_name
320            .map(|name| {
321                cluster
322                    .replica_id(name)
323                    .ok_or(AdapterError::UnknownClusterReplica {
324                        cluster_name: cluster.name.clone(),
325                        replica_name: name.to_string(),
326                    })
327            })
328            .transpose()?;
329
330        let source_ids = plan.source.depends_on();
331        let mut timeline_context = self
332            .catalog()
333            .validate_timeline_context(source_ids.iter().copied())?;
334        if matches!(timeline_context, TimelineContext::TimestampIndependent)
335            && plan.source.contains_temporal()?
336        {
337            // If the source IDs are timestamp independent but the query contains temporal functions,
338            // then the timeline context needs to be upgraded to timestamp dependent. This is
339            // required because `source_ids` doesn't contain functions.
340            timeline_context = TimelineContext::TimestampDependent;
341        }
342
343        let notices = check_log_reads(
344            &catalog,
345            cluster,
346            &source_ids,
347            &mut target_replica,
348            session.vars(),
349        )?;
350        session.add_notices(notices);
351
352        let dependencies = source_ids
353            .iter()
354            .map(|id| self.catalog.resolve_item_id(id))
355            .collect();
356        let validity = PlanValidity::new(
357            catalog.transient_revision(),
358            dependencies,
359            Some(cluster.id()),
360            target_replica,
361            session.role_metadata().clone(),
362        );
363
364        Ok(PeekStage::LinearizeTimestamp(PeekStageLinearizeTimestamp {
365            validity,
366            plan,
367            max_query_result_size,
368            source_ids,
369            target_replica,
370            timeline_context,
371            optimizer,
372            explain_ctx,
373        }))
374    }
375
376    /// Possibly linearize a timestamp from a `TimestampOracle`.
377    #[instrument]
378    async fn peek_linearize_timestamp(
379        &self,
380        session: &Session,
381        PeekStageLinearizeTimestamp {
382            validity,
383            source_ids,
384            plan,
385            max_query_result_size,
386            target_replica,
387            timeline_context,
388            optimizer,
389            explain_ctx,
390        }: PeekStageLinearizeTimestamp,
391    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
392        let isolation_level = session.vars().transaction_isolation().clone();
393        let timeline = Coordinator::get_timeline(&timeline_context);
394        let needs_linearized_read_ts =
395            Coordinator::needs_linearized_read_ts(&isolation_level, &plan.when);
396
397        let build_stage = move |oracle_read_ts: Option<Timestamp>| PeekStageRealTimeRecency {
398            validity,
399            plan,
400            max_query_result_size,
401            source_ids,
402            target_replica,
403            timeline_context,
404            oracle_read_ts,
405            optimizer,
406            explain_ctx,
407        };
408
409        match timeline {
410            Some(timeline) if needs_linearized_read_ts => {
411                let oracle = self.get_timestamp_oracle(&timeline);
412
413                // We ship the timestamp oracle off to an async task, so that we
414                // don't block the main task while we wait.
415
416                let span = Span::current();
417                Ok(StageResult::Handle(mz_ore::task::spawn(
418                    || "linearize timestamp",
419                    async move {
420                        let oracle_read_ts = oracle.read_ts().await;
421                        let stage = build_stage(Some(oracle_read_ts));
422                        let stage = PeekStage::RealTimeRecency(stage);
423                        Ok(Box::new(stage))
424                    }
425                    .instrument(span),
426                )))
427            }
428            Some(_) | None => {
429                let stage = build_stage(None);
430                let stage = PeekStage::RealTimeRecency(stage);
431                Ok(StageResult::Immediate(Box::new(stage)))
432            }
433        }
434    }
435
436    /// Determine a read timestamp and create appropriate read holds.
437    #[instrument]
438    fn peek_timestamp_read_hold(
439        &mut self,
440        session: &mut Session,
441        PeekStageTimestampReadHold {
442            mut validity,
443            plan,
444            max_query_result_size,
445            source_ids,
446            target_replica,
447            timeline_context,
448            oracle_read_ts,
449            real_time_recency_ts,
450            optimizer,
451            explain_ctx,
452        }: PeekStageTimestampReadHold,
453    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
454        let cluster_id = match optimizer.as_ref() {
455            Either::Left(optimizer) => optimizer.cluster_id(),
456            Either::Right(optimizer) => optimizer.cluster_id(),
457        };
458        let id_bundle = self
459            .dataflow_builder(cluster_id)
460            .sufficient_collections(source_ids.iter().copied());
461
462        // Although we have added `sources.depends_on()` to the validity already, also add the
463        // sufficient collections for safety.
464        let item_ids = id_bundle
465            .iter()
466            .map(|id| self.catalog().resolve_item_id(&id));
467        validity.extend_dependencies(item_ids);
468
469        let determination = self.sequence_peek_timestamp(
470            session,
471            &plan.when,
472            cluster_id,
473            timeline_context,
474            oracle_read_ts,
475            &id_bundle,
476            &source_ids,
477            real_time_recency_ts,
478            (&explain_ctx).into(),
479        )?;
480
481        let stage = PeekStage::Optimize(PeekStageOptimize {
482            validity,
483            plan,
484            max_query_result_size,
485            source_ids,
486            id_bundle,
487            target_replica,
488            determination,
489            optimizer,
490            explain_ctx,
491        });
492        Ok(StageResult::Immediate(Box::new(stage)))
493    }
494
495    #[instrument]
496    async fn peek_optimize(
497        &self,
498        session: &Session,
499        PeekStageOptimize {
500            validity,
501            plan,
502            max_query_result_size,
503            source_ids,
504            id_bundle,
505            target_replica,
506            determination,
507            mut optimizer,
508            explain_ctx,
509        }: PeekStageOptimize,
510    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
511        // Generate data structures that can be moved to another task where we will perform possibly
512        // expensive optimizations.
513        let timestamp_context = determination.timestamp_context.clone();
514        let stats = self
515            .statistics_oracle(session, &source_ids, &timestamp_context.antichain(), true)
516            .await
517            .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
518        let session = session.meta();
519        let now = self.catalog().config().now.clone();
520        let catalog = self.owned_catalog();
521        let mut compute_instances = BTreeMap::new();
522        if explain_ctx.needs_plan_insights() {
523            // There's a chance for index skew (indexes were created/deleted between stages) from the
524            // original plan, but that seems acceptable for insights.
525            for cluster in self.catalog().user_clusters() {
526                let snapshot = self.instance_snapshot(cluster.id).expect("must exist");
527                compute_instances.insert(cluster.name.clone(), snapshot);
528            }
529        }
530
531        let span = Span::current();
532        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
533            || "optimize peek",
534            move || {
535                span.in_scope(|| {
536                    let pipeline = || -> Result<
537                        Either<
538                            optimize::peek::GlobalLirPlan,
539                            optimize::copy_to::GlobalLirPlan,
540                        >,
541                        AdapterError,
542                    > {
543                        let _dispatch_guard = explain_ctx.dispatch_guard();
544
545                        let raw_expr = plan.source.clone();
546
547                        match optimizer.as_mut() {
548                            // Optimize SELECT statement.
549                            Either::Left(optimizer) => {
550                                // HIR ⇒ MIR lowering and MIR optimization (local)
551                                let local_mir_plan =
552                                    optimizer.catch_unwind_optimize(raw_expr)?;
553                                // Attach resolved context required to continue the pipeline.
554                                let local_mir_plan = local_mir_plan.resolve(
555                                    timestamp_context.clone(),
556                                    &session,
557                                    stats,
558                                );
559                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
560                                let global_lir_plan =
561                                    optimizer.catch_unwind_optimize(local_mir_plan)?;
562
563                                Ok(Either::Left(global_lir_plan))
564                            }
565                            // Optimize COPY TO statement.
566                            Either::Right(optimizer) => {
567                                // HIR ⇒ MIR lowering and MIR optimization (local)
568                                let local_mir_plan =
569                                    optimizer.catch_unwind_optimize(raw_expr)?;
570                                // Attach resolved context required to continue the pipeline.
571                                let local_mir_plan = local_mir_plan.resolve(
572                                    timestamp_context.clone(),
573                                    &session,
574                                    stats,
575                                );
576                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
577                                let global_lir_plan =
578                                    optimizer.catch_unwind_optimize(local_mir_plan)?;
579
580                                Ok(Either::Right(global_lir_plan))
581                            }
582                        }
583                    };
584
585                    let pipeline_result = pipeline();
586                    let optimization_finished_at = now();
587
588                    let stage = match pipeline_result {
589                        Ok(Either::Left(global_lir_plan)) => {
590                            let optimizer = optimizer.unwrap_left();
591                            // Enable fast path cluster calculation for slow path plans.
592                            let needs_plan_insights = explain_ctx.needs_plan_insights();
593                            // Disable anything that uses the optimizer if we only want the notice and
594                            // plan optimization took longer than the threshold. This is to prevent a
595                            // situation where optimizing takes a while and there a lots of clusters,
596                            // which would delay peek execution by the product of those.
597                            let opt_limit =
598                                PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
599                                    .get(catalog.system_config().dyncfgs());
600                            let target_instance =
601                                catalog.get_cluster(optimizer.cluster_id()).name.clone();
602                            let enable_re_optimize =
603                                !(matches!(explain_ctx, ExplainContext::PlanInsightsNotice(_))
604                                    && optimizer.duration() > opt_limit);
605                            let insights_ctx = needs_plan_insights
606                                .then(|| PlanInsightsContext {
607                                    stmt: plan
608                                        .select
609                                        .as_deref()
610                                        .map(Clone::clone)
611                                        .map(Statement::Select),
612                                    raw_expr: plan.source.clone(),
613                                    catalog,
614                                    compute_instances,
615                                    target_instance,
616                                    metrics: optimizer.metrics().clone(),
617                                    finishing: optimizer.finishing().clone(),
618                                    optimizer_config: optimizer.config().clone(),
619                                    session,
620                                    timestamp_context,
621                                    view_id: optimizer.select_id(),
622                                    index_id: optimizer.index_id(),
623                                    enable_re_optimize,
624                                })
625                                .map(Box::new);
626                            match explain_ctx {
627                                ExplainContext::Plan(explain_ctx) => {
628                                    let (_, df_meta, _) = global_lir_plan.unapply();
629                                    PeekStage::ExplainPlan(PeekStageExplainPlan {
630                                        validity,
631                                        optimizer,
632                                        df_meta,
633                                        explain_ctx,
634                                        insights_ctx,
635                                    })
636                                }
637                                ExplainContext::PlanInsightsNotice(optimizer_trace) => {
638                                    PeekStage::Finish(PeekStageFinish {
639                                        validity,
640                                        plan,
641                                        max_query_result_size,
642                                        id_bundle,
643                                        target_replica,
644                                        source_ids,
645                                        determination,
646                                        cluster_id: optimizer.cluster_id(),
647                                        finishing: optimizer.finishing().clone(),
648                                        plan_insights_optimizer_trace: Some(optimizer_trace),
649                                        global_lir_plan,
650                                        optimization_finished_at,
651                                        insights_ctx,
652                                    })
653                                }
654                                ExplainContext::None => PeekStage::Finish(PeekStageFinish {
655                                    validity,
656                                    plan,
657                                    max_query_result_size,
658                                    id_bundle,
659                                    target_replica,
660                                    source_ids,
661                                    determination,
662                                    cluster_id: optimizer.cluster_id(),
663                                    finishing: optimizer.finishing().clone(),
664                                    plan_insights_optimizer_trace: None,
665                                    global_lir_plan,
666                                    optimization_finished_at,
667                                    insights_ctx,
668                                }),
669                                ExplainContext::Pushdown => {
670                                    let (plan, _, _) = global_lir_plan.unapply();
671                                    let imports = match plan {
672                                        PeekPlan::SlowPath(plan) => plan
673                                            .desc
674                                            .source_imports
675                                            .into_iter()
676                                            .filter_map(|(id, import)| {
677                                                import.desc.arguments.operators.map(|mfp| (id, mfp))
678                                            })
679                                            .collect(),
680                                        PeekPlan::FastPath(_) => BTreeMap::default(),
681                                    };
682                                    PeekStage::ExplainPushdown(PeekStageExplainPushdown {
683                                        validity,
684                                        determination,
685                                        imports,
686                                    })
687                                }
688                            }
689                        }
690                        Ok(Either::Right(global_lir_plan)) => {
691                            let optimizer = optimizer.unwrap_right();
692                            PeekStage::CopyToPreflight(PeekStageCopyTo {
693                                validity,
694                                optimizer,
695                                global_lir_plan,
696                                optimization_finished_at,
697                                source_ids,
698                            })
699                        }
700                        // Internal optimizer errors are handled differently
701                        // depending on the caller.
702                        Err(err) => {
703                            let Some(optimizer) = optimizer.left() else {
704                                // In `COPY TO` contexts, immediately retire the
705                                // execution with the error.
706                                return Err(err);
707                            };
708                            let ExplainContext::Plan(explain_ctx) = explain_ctx else {
709                                // In `sequence_~` contexts, immediately retire the
710                                // execution with the error.
711                                return Err(err);
712                            };
713
714                            if explain_ctx.broken {
715                                // In `EXPLAIN BROKEN` contexts, just log the error
716                                // and move to the next stage with default
717                                // parameters.
718                                tracing::error!("error while handling EXPLAIN statement: {}", err);
719                                PeekStage::ExplainPlan(PeekStageExplainPlan {
720                                    validity,
721                                    optimizer,
722                                    df_meta: Default::default(),
723                                    explain_ctx,
724                                    insights_ctx: None,
725                                })
726                            } else {
727                                // In regular `EXPLAIN` contexts, immediately retire
728                                // the execution with the error.
729                                return Err(err);
730                            }
731                        }
732                    };
733                    Ok(Box::new(stage))
734                })
735            },
736        )))
737    }
738
739    #[instrument]
740    async fn peek_real_time_recency(
741        &self,
742        session: &Session,
743        PeekStageRealTimeRecency {
744            validity,
745            plan,
746            max_query_result_size,
747            source_ids,
748            target_replica,
749            timeline_context,
750            oracle_read_ts,
751            optimizer,
752            explain_ctx,
753        }: PeekStageRealTimeRecency,
754    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
755        let fut = self
756            .determine_real_time_recent_timestamp_if_needed(session, source_ids.iter().copied())
757            .await?;
758
759        match fut {
760            Some(fut) => {
761                let span = Span::current();
762                Ok(StageResult::Handle(mz_ore::task::spawn(
763                    || "peek real time recency",
764                    async move {
765                        let real_time_recency_ts = fut.await?;
766                        let stage = PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
767                            validity,
768                            plan,
769                            max_query_result_size,
770                            target_replica,
771                            timeline_context,
772                            source_ids,
773                            optimizer,
774                            explain_ctx,
775                            oracle_read_ts,
776                            real_time_recency_ts: Some(real_time_recency_ts),
777                        });
778                        Ok(Box::new(stage))
779                    }
780                    .instrument(span),
781                )))
782            }
783            None => Ok(StageResult::Immediate(Box::new(
784                PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
785                    validity,
786                    plan,
787                    max_query_result_size,
788                    target_replica,
789                    timeline_context,
790                    source_ids,
791                    optimizer,
792                    explain_ctx,
793                    oracle_read_ts,
794                    real_time_recency_ts: None,
795                }),
796            ))),
797        }
798    }
799
800    #[instrument]
801    async fn peek_finish(
802        &mut self,
803        ctx: &mut ExecuteContext,
804        PeekStageFinish {
805            validity: _,
806            plan,
807            max_query_result_size,
808            id_bundle,
809            target_replica,
810            source_ids,
811            determination,
812            cluster_id,
813            finishing,
814            plan_insights_optimizer_trace,
815            global_lir_plan,
816            optimization_finished_at,
817            insights_ctx,
818        }: PeekStageFinish,
819    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
820        if let Some(id) = ctx.extra.contents() {
821            self.record_statement_lifecycle_event(
822                &id,
823                &StatementLifecycleEvent::OptimizationFinished,
824                optimization_finished_at,
825            );
826        }
827
828        let session = ctx.session_mut();
829        let conn_id = session.conn_id().clone();
830
831        let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
832        let source_arity = typ.arity();
833
834        emit_optimizer_notices(&*self.catalog, &*session, &df_meta.optimizer_notices);
835
836        if let Some(trace) = plan_insights_optimizer_trace {
837            let target_cluster = self.catalog().get_cluster(cluster_id);
838            let features = OptimizerFeatures::from(self.catalog().system_config())
839                .override_from(&target_cluster.config.features());
840            let insights = trace
841                .into_plan_insights(
842                    &features,
843                    &self.catalog().for_session(session),
844                    Some(plan.finishing),
845                    Some(target_cluster),
846                    df_meta,
847                    insights_ctx,
848                )
849                .await?;
850            session.add_notice(AdapterNotice::PlanInsights(insights));
851        }
852
853        let planned_peek = PlannedPeek {
854            plan: peek_plan,
855            determination: determination.clone(),
856            conn_id: conn_id.clone(),
857            intermediate_result_type: typ,
858            source_arity,
859            source_ids,
860        };
861
862        if let Some(transient_index_id) = match &planned_peek.plan {
863            peek::PeekPlan::FastPath(_) => None,
864            peek::PeekPlan::SlowPath(PeekDataflowPlan { id, .. }) => Some(id),
865        } {
866            if let Some(statement_logging_id) = ctx.extra.contents() {
867                self.set_transient_index_id(statement_logging_id, *transient_index_id);
868            }
869        }
870
871        if let Some(logging_id) = ctx.extra().contents() {
872            let watch_set = WatchSetCreation::new(
873                logging_id,
874                self.catalog.state(),
875                &id_bundle,
876                determination.timestamp_context.timestamp_or_default(),
877            );
878            self.install_peek_watch_sets(conn_id.clone(), watch_set).expect("the old peek sequencing re-verifies the dependencies' existence before installing the new watch sets");
879        }
880
881        let max_result_size = self.catalog().system_config().max_result_size();
882
883        // Implement the peek, and capture the response.
884        let resp = self
885            .implement_peek_plan(
886                ctx.extra_mut(),
887                planned_peek,
888                finishing,
889                cluster_id,
890                target_replica,
891                max_result_size,
892                max_query_result_size,
893            )
894            .await?;
895
896        if ctx.session().vars().emit_timestamp_notice() {
897            let explanation = self.explain_timestamp(
898                ctx.session().conn_id(),
899                ctx.session().pcx().wall_time,
900                cluster_id,
901                &id_bundle,
902                determination,
903            );
904            ctx.session()
905                .add_notice(AdapterNotice::QueryTimestamp { explanation });
906        }
907
908        let resp = match plan.copy_to {
909            None => resp,
910            Some(format) => ExecuteResponse::CopyTo {
911                format,
912                resp: Box::new(resp),
913            },
914        };
915        Ok(StageResult::Response(resp))
916    }
917
918    #[instrument]
919    async fn peek_copy_to_preflight(
920        &self,
921        copy_to: PeekStageCopyTo,
922    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
923        let connection_context = self.connection_context().clone();
924        Ok(StageResult::Handle(mz_ore::task::spawn(
925            || "peek copy to preflight",
926            async {
927                let sinks = &copy_to.global_lir_plan.df_desc().sink_exports;
928                if sinks.len() != 1 {
929                    return Err(AdapterError::Internal(
930                        "expected exactly one copy to s3 sink".into(),
931                    ));
932                }
933                let (sink_id, sink_desc) = sinks
934                    .first_key_value()
935                    .expect("known to be exactly one copy to s3 sink");
936                match &sink_desc.connection {
937                    ComputeSinkConnection::CopyToS3Oneshot(conn) => {
938                        mz_storage_types::sinks::s3_oneshot_sink::preflight(
939                            connection_context,
940                            &conn.aws_connection,
941                            &conn.upload_info,
942                            conn.connection_id,
943                            *sink_id,
944                        )
945                        .await?;
946                        Ok(Box::new(PeekStage::CopyToDataflow(copy_to)))
947                    }
948                    _ => Err(AdapterError::Internal(
949                        "expected copy to s3 oneshot sink".into(),
950                    )),
951                }
952            },
953        )))
954    }
955
956    #[instrument]
957    async fn peek_copy_to_dataflow(
958        &mut self,
959        ctx: &ExecuteContext,
960        PeekStageCopyTo {
961            validity: _,
962            optimizer,
963            global_lir_plan,
964            optimization_finished_at,
965            source_ids,
966        }: PeekStageCopyTo,
967    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
968        if let Some(id) = ctx.extra.contents() {
969            self.record_statement_lifecycle_event(
970                &id,
971                &StatementLifecycleEvent::OptimizationFinished,
972                optimization_finished_at,
973            );
974        }
975
976        let sink_id = global_lir_plan.sink_id();
977        let cluster_id = optimizer.cluster_id();
978
979        let (df_desc, df_meta) = global_lir_plan.unapply();
980
981        emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
982
983        // Callback for the active copy to.
984        let (tx, rx) = oneshot::channel();
985        let active_copy_to = ActiveCopyTo {
986            conn_id: ctx.session().conn_id().clone(),
987            tx,
988            cluster_id,
989            depends_on: source_ids,
990        };
991        // Add metadata for the new COPY TO. CopyTo returns a `ready` future, so it is safe to drop.
992        drop(
993            self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
994                .await,
995        );
996
997        // Ship dataflow.
998        self.ship_dataflow(df_desc, cluster_id, None).await;
999
1000        let span = Span::current();
1001        Ok(StageResult::HandleRetire(mz_ore::task::spawn(
1002            || "peek copy to dataflow",
1003            async {
1004                let res = rx.await;
1005                match res {
1006                    Ok(res) => res,
1007                    Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
1008                }
1009            }
1010            .instrument(span),
1011        )))
1012    }
1013
1014    #[instrument]
1015    async fn peek_explain_plan(
1016        &self,
1017        session: &Session,
1018        PeekStageExplainPlan {
1019            optimizer,
1020            insights_ctx,
1021            df_meta,
1022            explain_ctx,
1023            ..
1024        }: PeekStageExplainPlan,
1025    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1026        let rows = super::super::explain_plan_inner(
1027            session,
1028            self.catalog(),
1029            df_meta,
1030            explain_ctx,
1031            optimizer,
1032            insights_ctx,
1033        )
1034        .await?;
1035
1036        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
1037    }
1038
1039    #[instrument]
1040    async fn peek_explain_pushdown(
1041        &self,
1042        session: &Session,
1043        stage: PeekStageExplainPushdown,
1044    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1045        let as_of = stage.determination.timestamp_context.antichain();
1046        let mz_now = stage
1047            .determination
1048            .timestamp_context
1049            .timestamp()
1050            .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1051            .unwrap_or_else(ResultSpec::value_all);
1052        let fut = self
1053            .explain_pushdown_future(session, as_of, mz_now, stage.imports)
1054            .await;
1055        let span = Span::current();
1056        Ok(StageResult::HandleRetire(mz_ore::task::spawn(
1057            || "peek explain pushdown",
1058            fut.instrument(span),
1059        )))
1060    }
1061
1062    /// Determines the query timestamp and acquires read holds on dependent sources
1063    /// if necessary.
1064    #[instrument]
1065    pub(super) fn sequence_peek_timestamp(
1066        &mut self,
1067        session: &mut Session,
1068        when: &QueryWhen,
1069        cluster_id: ClusterId,
1070        timeline_context: TimelineContext,
1071        oracle_read_ts: Option<Timestamp>,
1072        source_bundle: &CollectionIdBundle,
1073        source_ids: &BTreeSet<GlobalId>,
1074        real_time_recency_ts: Option<Timestamp>,
1075        requires_linearization: RequireLinearization,
1076    ) -> Result<TimestampDetermination<Timestamp>, AdapterError> {
1077        let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when);
1078        let timedomain_bundle;
1079
1080        // Fetch or generate a timestamp for this query and what the read holds would be if we need to set
1081        // them.
1082        let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
1083            // Use the transaction's timestamp if it exists and this isn't an AS OF query.
1084            Some(
1085                determination @ TimestampDetermination {
1086                    timestamp_context: TimestampContext::TimelineTimestamp { .. },
1087                    ..
1088                },
1089            ) if in_immediate_multi_stmt_txn => (determination, None),
1090            _ => {
1091                let determine_bundle = if in_immediate_multi_stmt_txn {
1092                    // In a transaction, determine a timestamp that will be valid for anything in
1093                    // any schema referenced by the first query.
1094                    timedomain_bundle = timedomain_for(
1095                        self.catalog(),
1096                        &self.index_oracle(cluster_id),
1097                        source_ids,
1098                        &timeline_context,
1099                        session.conn_id(),
1100                        cluster_id,
1101                    )?;
1102
1103                    &timedomain_bundle
1104                } else {
1105                    // If not in a transaction, use the source.
1106                    source_bundle
1107                };
1108                let (determination, read_holds) = self.determine_timestamp(
1109                    session,
1110                    determine_bundle,
1111                    when,
1112                    cluster_id,
1113                    &timeline_context,
1114                    oracle_read_ts,
1115                    real_time_recency_ts,
1116                )?;
1117                // We only need read holds if the read depends on a timestamp.
1118                let read_holds = match determination.timestamp_context.timestamp() {
1119                    Some(_ts) => Some(read_holds),
1120                    None => {
1121                        // We don't need the read holds and shouldn't add them
1122                        // to the txn.
1123                        //
1124                        // TODO: Handle this within determine_timestamp.
1125                        drop(read_holds);
1126                        None
1127                    }
1128                };
1129                (determination, read_holds)
1130            }
1131        };
1132
1133        // Always either verify the current statement ids are within the existing
1134        // transaction's read hold set (timedomain), or create the read holds if this is the
1135        // first statement in a transaction (or this is a single statement transaction).
1136        // This must happen even if this is an `AS OF` query as well. There are steps after
1137        // this that happen off thread, so no matter the kind of statement or transaction,
1138        // we must acquire read holds here so they are held until the off-thread work
1139        // returns to the coordinator.
1140
1141        if let Some(txn_reads) = self.txn_read_holds.get(session.conn_id()) {
1142            // Find referenced ids not in the read hold. A reference could be caused by a
1143            // user specifying an object in a different schema than the first query. An
1144            // index could be caused by a CREATE INDEX after the transaction started.
1145            let allowed_id_bundle = txn_reads.id_bundle();
1146
1147            // We don't need the read holds that determine_timestamp acquired
1148            // for us.
1149            drop(read_holds);
1150
1151            let outside = source_bundle.difference(&allowed_id_bundle);
1152            // Queries without a timestamp and timeline can belong to any existing timedomain.
1153            if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
1154                let valid_names =
1155                    allowed_id_bundle.resolve_names(self.catalog(), session.conn_id());
1156                let invalid_names = outside.resolve_names(self.catalog(), session.conn_id());
1157                return Err(AdapterError::RelationOutsideTimeDomain {
1158                    relations: invalid_names,
1159                    names: valid_names,
1160                });
1161            }
1162        } else if let Some(read_holds) = read_holds {
1163            self.store_transaction_read_holds(session.conn_id().clone(), read_holds);
1164        }
1165
1166        // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
1167        // arbitrary and we don't recall why we did it (possibly an error!). Change this to always
1168        // set the transaction ops. Decide and document what our policy should be on AS OF queries.
1169        // Maybe they shouldn't be allowed in transactions at all because it's hard to explain
1170        // what's going on there. This should probably get a small design document.
1171
1172        // We only track the peeks in the session if the query doesn't use AS
1173        // OF or we're inside an explicit transaction. The latter case is
1174        // necessary to support PG's `BEGIN` semantics, whose behavior can
1175        // depend on whether or not reads have occurred in the txn.
1176        let mut transaction_determination = determination.clone();
1177        if when.is_transactional() {
1178            session.add_transaction_ops(TransactionOps::Peeks {
1179                determination: transaction_determination,
1180                cluster_id,
1181                requires_linearization,
1182            })?;
1183        } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
1184            // If the query uses AS OF, then ignore the timestamp.
1185            transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
1186            session.add_transaction_ops(TransactionOps::Peeks {
1187                determination: transaction_determination,
1188                cluster_id,
1189                requires_linearization,
1190            })?;
1191        };
1192
1193        Ok(determination)
1194    }
1195}