Skip to main content

mz_adapter/coord/sequencer/inner/
peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::sync::Arc;
12
13use mz_adapter_types::dyncfgs::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION;
14use mz_compute_types::sinks::ComputeSinkConnection;
15use mz_controller_types::ClusterId;
16use mz_expr::{CollectionPlan, ResultSpec};
17use mz_ore::cast::CastFrom;
18use mz_ore::instrument;
19use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
20use mz_repr::{Datum, GlobalId, Timestamp};
21use mz_sql::ast::{ExplainStage, Statement};
22use mz_sql::catalog::CatalogCluster;
23// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
24use mz_sql::plan::QueryWhen;
25use mz_sql::plan::{self};
26use mz_sql::session::metadata::SessionMetadata;
27use mz_transform::EmptyStatisticsOracle;
28use tokio::sync::oneshot;
29use tracing::warn;
30use tracing::{Instrument, Span};
31
32use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
33use crate::command::ExecuteResponse;
34use crate::coord::id_bundle::CollectionIdBundle;
35use crate::coord::peek::{self, PeekDataflowPlan, PeekPlan, PlannedPeek};
36use crate::coord::sequencer::inner::return_if_err;
37use crate::coord::sequencer::{check_log_reads, emit_optimizer_notices, eval_copy_to_uri};
38use crate::coord::timeline::{TimelineContext, timedomain_for};
39use crate::coord::timestamp_selection::{
40    TimestampContext, TimestampDetermination, TimestampProvider,
41};
42use crate::coord::{
43    Coordinator, CopyToContext, ExecuteContext, ExplainContext, ExplainPlanContext, Message,
44    PeekStage, PeekStageCopyTo, PeekStageExplainPlan, PeekStageExplainPushdown, PeekStageFinish,
45    PeekStageLinearizeTimestamp, PeekStageOptimize, PeekStageRealTimeRecency,
46    PeekStageTimestampReadHold, PlanValidity, StageResult, Staged, TargetCluster,
47};
48use crate::error::AdapterError;
49use crate::explain::insights::PlanInsightsContext;
50use crate::explain::optimizer_trace::OptimizerTrace;
51use crate::notice::AdapterNotice;
52use crate::optimize;
53use crate::session::{RequireLinearization, Session, TransactionOps, TransactionStatus};
54use crate::statement_logging::StatementLifecycleEvent;
55use crate::statement_logging::WatchSetCreation;
56
57impl Staged for PeekStage {
58    type Ctx = ExecuteContext;
59
60    fn validity(&mut self) -> &mut PlanValidity {
61        match self {
62            PeekStage::LinearizeTimestamp(stage) => &mut stage.validity,
63            PeekStage::RealTimeRecency(stage) => &mut stage.validity,
64            PeekStage::TimestampReadHold(stage) => &mut stage.validity,
65            PeekStage::Optimize(stage) => &mut stage.validity,
66            PeekStage::Finish(stage) => &mut stage.validity,
67            PeekStage::ExplainPlan(stage) => &mut stage.validity,
68            PeekStage::ExplainPushdown(stage) => &mut stage.validity,
69            PeekStage::CopyToPreflight(stage) => &mut stage.validity,
70            PeekStage::CopyToDataflow(stage) => &mut stage.validity,
71        }
72    }
73
74    async fn stage(
75        self,
76        coord: &mut Coordinator,
77        ctx: &mut ExecuteContext,
78    ) -> Result<StageResult<Box<Self>>, AdapterError> {
79        match self {
80            PeekStage::LinearizeTimestamp(stage) => {
81                coord.peek_linearize_timestamp(ctx.session(), stage).await
82            }
83            PeekStage::RealTimeRecency(stage) => {
84                coord.peek_real_time_recency(ctx.session(), stage).await
85            }
86            PeekStage::TimestampReadHold(stage) => {
87                coord.peek_timestamp_read_hold(ctx.session_mut(), stage)
88            }
89            PeekStage::Optimize(stage) => coord.peek_optimize(ctx.session(), stage).await,
90            PeekStage::Finish(stage) => coord.peek_finish(ctx, stage).await,
91            PeekStage::ExplainPlan(stage) => coord.peek_explain_plan(ctx.session(), stage).await,
92            PeekStage::ExplainPushdown(stage) => {
93                coord.peek_explain_pushdown(ctx.session(), stage).await
94            }
95            PeekStage::CopyToPreflight(stage) => coord.peek_copy_to_preflight(stage).await,
96            PeekStage::CopyToDataflow(stage) => coord.peek_copy_to_dataflow(ctx, stage).await,
97        }
98    }
99
100    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
101        Message::PeekStageReady {
102            ctx,
103            span,
104            stage: self,
105        }
106    }
107
108    fn cancel_enabled(&self) -> bool {
109        true
110    }
111}
112
113impl Coordinator {
114    /// Sequence a peek, determining a timestamp and the most efficient dataflow interaction.
115    ///
116    /// Peeks are sequenced by assigning a timestamp for evaluation, and then determining and
117    /// deploying the most efficient evaluation plan. The peek could evaluate to a constant,
118    /// be a simple read out of an existing arrangement, or required a new dataflow to build
119    /// the results to return.
120    #[instrument]
121    pub(crate) async fn sequence_peek(
122        &mut self,
123        ctx: ExecuteContext,
124        plan: plan::SelectPlan,
125        target_cluster: TargetCluster,
126        max_query_result_size: Option<u64>,
127    ) {
128        let explain_ctx = if ctx.session().vars().emit_plan_insights_notice() {
129            let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
130            ExplainContext::PlanInsightsNotice(optimizer_trace)
131        } else {
132            ExplainContext::None
133        };
134
135        let stage = return_if_err!(
136            self.peek_validate(
137                ctx.session(),
138                plan,
139                target_cluster,
140                None,
141                explain_ctx,
142                max_query_result_size
143            ),
144            ctx
145        );
146        self.sequence_staged(ctx, Span::current(), stage).await;
147    }
148
149    #[instrument]
150    pub(crate) async fn sequence_copy_to(
151        &mut self,
152        ctx: ExecuteContext,
153        plan::CopyToPlan {
154            select_plan,
155            desc,
156            to,
157            connection,
158            connection_id,
159            format,
160            max_file_size,
161        }: plan::CopyToPlan,
162        target_cluster: TargetCluster,
163    ) {
164        let uri = return_if_err!(
165            eval_copy_to_uri(to, ctx.session(), self.catalog().state()),
166            ctx
167        );
168
169        let stage = return_if_err!(
170            self.peek_validate(
171                ctx.session(),
172                select_plan,
173                target_cluster,
174                Some(CopyToContext {
175                    desc,
176                    uri,
177                    connection,
178                    connection_id,
179                    format,
180                    max_file_size,
181                    // This will be set in `peek_stage_validate` stage below.
182                    output_batch_count: None,
183                }),
184                ExplainContext::None,
185                Some(ctx.session().vars().max_query_result_size()),
186            ),
187            ctx
188        );
189        self.sequence_staged(ctx, Span::current(), stage).await;
190    }
191
192    #[instrument]
193    pub(crate) async fn explain_peek(
194        &mut self,
195        ctx: ExecuteContext,
196        plan::ExplainPlanPlan {
197            stage,
198            format,
199            config,
200            explainee,
201        }: plan::ExplainPlanPlan,
202        target_cluster: TargetCluster,
203    ) {
204        let plan::Explainee::Statement(stmt) = explainee else {
205            // This is currently asserted in the `sequence_explain_plan` code that
206            // calls this method.
207            unreachable!()
208        };
209        let plan::ExplaineeStatement::Select { broken, plan, desc } = stmt else {
210            // This is currently asserted in the `sequence_explain_plan` code that
211            // calls this method.
212            unreachable!()
213        };
214
215        // Create an OptimizerTrace instance to collect plans emitted when
216        // executing the optimizer pipeline.
217        let optimizer_trace = OptimizerTrace::new(stage.paths());
218
219        let stage = return_if_err!(
220            self.peek_validate(
221                ctx.session(),
222                plan,
223                target_cluster,
224                None,
225                ExplainContext::Plan(ExplainPlanContext {
226                    broken,
227                    config,
228                    format,
229                    stage,
230                    replan: None,
231                    desc: Some(desc),
232                    optimizer_trace,
233                }),
234                Some(ctx.session().vars().max_query_result_size()),
235            ),
236            ctx
237        );
238        self.sequence_staged(ctx, Span::current(), stage).await;
239    }
240
241    /// Do some simple validation. We must defer most of it until after any off-thread work.
242    #[instrument]
243    pub fn peek_validate(
244        &self,
245        session: &Session,
246        plan: mz_sql::plan::SelectPlan,
247        target_cluster: TargetCluster,
248        copy_to_ctx: Option<CopyToContext>,
249        explain_ctx: ExplainContext,
250        max_query_result_size: Option<u64>,
251    ) -> Result<PeekStage, AdapterError> {
252        // Collect optimizer parameters.
253        let catalog = self.owned_catalog();
254        let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
255        let compute_instance = self
256            .instance_snapshot(cluster.id())
257            .expect("compute instance does not exist");
258        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
259            .override_from(&self.catalog.get_cluster(cluster.id()).config.features())
260            .override_from(&self.cluster_scoped_optimizer_overrides(cluster.id()))
261            .override_from(&explain_ctx);
262
263        if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
264            return Err(AdapterError::NoClusterReplicasAvailable {
265                name: cluster.name.clone(),
266                is_managed: cluster.is_managed(),
267            });
268        }
269
270        let optimizer = match copy_to_ctx {
271            None => {
272                // Collect optimizer parameters specific to the peek::Optimizer.
273                let (_, view_id) = self.allocate_transient_id();
274                let (_, index_id) = self.allocate_transient_id();
275
276                // Build an optimizer for this SELECT.
277                optimize::PeekOptimizer::Select(optimize::peek::Optimizer::new(
278                    Arc::clone(&catalog),
279                    compute_instance,
280                    plan.finishing.clone(),
281                    view_id,
282                    index_id,
283                    optimizer_config,
284                    self.optimizer_metrics(),
285                ))
286            }
287            Some(mut copy_to_ctx) => {
288                // Getting the max worker count across replicas
289                // and using that value for the number of batches to
290                // divide the copy output into.
291                let worker_counts = cluster.replicas().map(|r| {
292                    let loc = &r.config.location;
293                    loc.workers().unwrap_or_else(|| loc.num_processes())
294                });
295                let max_worker_count = match worker_counts.max() {
296                    Some(count) => u64::cast_from(count),
297                    None => {
298                        return Err(AdapterError::NoClusterReplicasAvailable {
299                            name: cluster.name.clone(),
300                            is_managed: cluster.is_managed(),
301                        });
302                    }
303                };
304                copy_to_ctx.output_batch_count = Some(max_worker_count);
305                let (_, view_id) = self.allocate_transient_id();
306                // Build an optimizer for this COPY TO.
307                optimize::PeekOptimizer::CopyTo(optimize::copy_to::Optimizer::new(
308                    Arc::clone(&catalog),
309                    compute_instance,
310                    view_id,
311                    copy_to_ctx,
312                    optimizer_config,
313                    self.optimizer_metrics(),
314                ))
315            }
316        };
317
318        let target_replica_name = session.vars().cluster_replica();
319        let mut target_replica = target_replica_name
320            .map(|name| {
321                cluster
322                    .replica_id(name)
323                    .ok_or(AdapterError::UnknownClusterReplica {
324                        cluster_name: cluster.name.clone(),
325                        replica_name: name.to_string(),
326                    })
327            })
328            .transpose()?;
329
330        let source_ids = plan.source.depends_on();
331        let mut timeline_context = self
332            .catalog()
333            .validate_timeline_context(source_ids.iter().copied())?;
334        if matches!(timeline_context, TimelineContext::TimestampIndependent)
335            && plan.source.contains_temporal()
336        {
337            // If the source IDs are timestamp independent but the query contains temporal functions,
338            // then the timeline context needs to be upgraded to timestamp dependent. This is
339            // required because `source_ids` doesn't contain functions.
340            timeline_context = TimelineContext::TimestampDependent;
341        }
342
343        let notices = check_log_reads(
344            &catalog,
345            cluster,
346            &source_ids,
347            &mut target_replica,
348            session.vars(),
349        )?;
350        session.add_notices(notices);
351
352        let dependencies = source_ids
353            .iter()
354            .map(|id| self.catalog.resolve_item_id(id))
355            .collect();
356        let validity = PlanValidity::new(
357            catalog.transient_revision(),
358            dependencies,
359            Some(cluster.id()),
360            target_replica,
361            session.role_metadata().clone(),
362        );
363
364        Ok(PeekStage::LinearizeTimestamp(PeekStageLinearizeTimestamp {
365            validity,
366            plan,
367            max_query_result_size,
368            source_ids,
369            target_replica,
370            timeline_context,
371            optimizer,
372            explain_ctx,
373        }))
374    }
375
376    /// Possibly linearize a timestamp from a `TimestampOracle`.
377    #[instrument]
378    async fn peek_linearize_timestamp(
379        &self,
380        session: &Session,
381        PeekStageLinearizeTimestamp {
382            validity,
383            source_ids,
384            plan,
385            max_query_result_size,
386            target_replica,
387            timeline_context,
388            optimizer,
389            explain_ctx,
390        }: PeekStageLinearizeTimestamp,
391    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
392        let isolation_level = session.vars().transaction_isolation().clone();
393        let timeline = Coordinator::get_timeline(&timeline_context);
394        let needs_linearized_read_ts =
395            Coordinator::needs_linearized_read_ts(&isolation_level, &plan.when);
396
397        let build_stage = move |oracle_read_ts: Option<Timestamp>| PeekStageRealTimeRecency {
398            validity,
399            plan,
400            max_query_result_size,
401            source_ids,
402            target_replica,
403            timeline_context,
404            oracle_read_ts,
405            optimizer,
406            explain_ctx,
407        };
408
409        match timeline {
410            Some(timeline) if needs_linearized_read_ts => {
411                let oracle = self.get_timestamp_oracle(&timeline);
412
413                // We ship the timestamp oracle off to an async task, so that we
414                // don't block the main task while we wait.
415
416                let span = Span::current();
417                Ok(StageResult::Handle(mz_ore::task::spawn(
418                    || "linearize timestamp",
419                    async move {
420                        let oracle_read_ts = oracle.read_ts().await;
421                        let stage = build_stage(Some(oracle_read_ts));
422                        let stage = PeekStage::RealTimeRecency(stage);
423                        Ok(Box::new(stage))
424                    }
425                    .instrument(span),
426                )))
427            }
428            Some(_) | None => {
429                let stage = build_stage(None);
430                let stage = PeekStage::RealTimeRecency(stage);
431                Ok(StageResult::Immediate(Box::new(stage)))
432            }
433        }
434    }
435
436    /// Determine a read timestamp and create appropriate read holds.
437    #[instrument]
438    fn peek_timestamp_read_hold(
439        &mut self,
440        session: &mut Session,
441        PeekStageTimestampReadHold {
442            mut validity,
443            plan,
444            max_query_result_size,
445            source_ids,
446            target_replica,
447            timeline_context,
448            oracle_read_ts,
449            real_time_recency_ts,
450            optimizer,
451            explain_ctx,
452        }: PeekStageTimestampReadHold,
453    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
454        let cluster_id = optimizer.cluster_id();
455        let id_bundle = self
456            .dataflow_builder(cluster_id)
457            .sufficient_collections(source_ids.iter().copied());
458
459        // Although we have added `sources.depends_on()` to the validity already, also add the
460        // sufficient collections for safety.
461        let item_ids = id_bundle
462            .iter()
463            .map(|id| self.catalog().resolve_item_id(&id));
464        validity.extend_dependencies(item_ids);
465
466        let determination = self.sequence_peek_timestamp(
467            session,
468            &plan.when,
469            cluster_id,
470            timeline_context,
471            oracle_read_ts,
472            &id_bundle,
473            &source_ids,
474            real_time_recency_ts,
475            (&explain_ctx).into(),
476        )?;
477
478        let stage = PeekStage::Optimize(PeekStageOptimize {
479            validity,
480            plan,
481            max_query_result_size,
482            source_ids,
483            id_bundle,
484            target_replica,
485            determination,
486            optimizer,
487            explain_ctx,
488        });
489        Ok(StageResult::Immediate(Box::new(stage)))
490    }
491
492    #[instrument]
493    async fn peek_optimize(
494        &self,
495        session: &Session,
496        PeekStageOptimize {
497            validity,
498            plan,
499            max_query_result_size,
500            source_ids,
501            id_bundle,
502            target_replica,
503            determination,
504            mut optimizer,
505            explain_ctx,
506        }: PeekStageOptimize,
507    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
508        // Generate data structures that can be moved to another task where we will perform possibly
509        // expensive optimizations.
510        let timestamp_context = determination.timestamp_context.clone();
511        let stats = self
512            .statistics_oracle(session, &source_ids, &timestamp_context.antichain(), true)
513            .await
514            .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
515        let session = session.meta();
516        let now = self.catalog().config().now.clone();
517        let catalog = self.owned_catalog();
518        let mut compute_instances = BTreeMap::new();
519        if explain_ctx.needs_plan_insights() {
520            // There's a chance for index skew (indexes were created/deleted between stages) from the
521            // original plan, but that seems acceptable for insights.
522            for cluster in self.catalog().user_clusters() {
523                let snapshot = self.instance_snapshot(cluster.id).expect("must exist");
524                compute_instances.insert(cluster.name.clone(), snapshot);
525            }
526        }
527
528        let span = Span::current();
529        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
530            || "optimize peek",
531            move || {
532                span.in_scope(|| {
533                    // Run the optimization pipeline. The dispatch guard borrows
534                    // `explain_ctx`, so we scope it in a block to drop it before
535                    // `explain_ctx` is inspected below. A failed optimization is
536                    // captured in `pipeline_result` rather than returned, so that
537                    // `EXPLAIN BROKEN` can still be handled in the match.
538                    let pipeline_result = {
539                        let _dispatch_guard = explain_ctx.dispatch_guard();
540
541                        let raw_expr = plan.source.clone();
542
543                        optimizer
544                            .optimize(raw_expr, timestamp_context.clone(), &session, stats)
545                            .map_err(AdapterError::from)
546                    };
547
548                    let optimization_finished_at = now();
549
550                    let stage = match pipeline_result {
551                        Ok(optimize::PeekGlobalLirPlan::Select(global_lir_plan)) => {
552                            let optimizer =
553                                optimizer.into_select().expect("a SELECT/EXPLAIN optimizer");
554                            // Enable fast path cluster calculation for slow path plans.
555                            let needs_plan_insights = explain_ctx.needs_plan_insights();
556                            // Disable anything that uses the optimizer if we only want the notice and
557                            // plan optimization took longer than the threshold. This is to prevent a
558                            // situation where optimizing takes a while and there a lots of clusters,
559                            // which would delay peek execution by the product of those.
560                            let opt_limit =
561                                PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
562                                    .get(catalog.system_config().dyncfgs());
563                            let target_instance =
564                                catalog.get_cluster(optimizer.cluster_id()).name.clone();
565                            let enable_re_optimize =
566                                !(matches!(explain_ctx, ExplainContext::PlanInsightsNotice(_))
567                                    && optimizer.duration() > opt_limit);
568                            let insights_ctx = needs_plan_insights
569                                .then(|| PlanInsightsContext {
570                                    stmt: plan
571                                        .select
572                                        .as_deref()
573                                        .map(Clone::clone)
574                                        .map(Statement::Select),
575                                    raw_expr: plan.source.clone(),
576                                    catalog,
577                                    compute_instances,
578                                    target_instance,
579                                    metrics: optimizer.metrics().clone(),
580                                    finishing: optimizer.finishing().clone(),
581                                    optimizer_config: optimizer.config().clone(),
582                                    session,
583                                    timestamp_context,
584                                    view_id: optimizer.select_id(),
585                                    index_id: optimizer.index_id(),
586                                    enable_re_optimize,
587                                })
588                                .map(Box::new);
589                            match explain_ctx {
590                                ExplainContext::Plan(explain_ctx) => {
591                                    let (_, df_meta, _) = global_lir_plan.unapply();
592                                    PeekStage::ExplainPlan(PeekStageExplainPlan {
593                                        validity,
594                                        optimizer,
595                                        df_meta,
596                                        explain_ctx,
597                                        insights_ctx,
598                                    })
599                                }
600                                ExplainContext::PlanInsightsNotice(optimizer_trace) => {
601                                    PeekStage::Finish(PeekStageFinish {
602                                        validity,
603                                        plan,
604                                        max_query_result_size,
605                                        id_bundle,
606                                        target_replica,
607                                        source_ids,
608                                        determination,
609                                        cluster_id: optimizer.cluster_id(),
610                                        finishing: optimizer.finishing().clone(),
611                                        plan_insights_optimizer_trace: Some(optimizer_trace),
612                                        global_lir_plan,
613                                        optimization_finished_at,
614                                        insights_ctx,
615                                    })
616                                }
617                                ExplainContext::None => PeekStage::Finish(PeekStageFinish {
618                                    validity,
619                                    plan,
620                                    max_query_result_size,
621                                    id_bundle,
622                                    target_replica,
623                                    source_ids,
624                                    determination,
625                                    cluster_id: optimizer.cluster_id(),
626                                    finishing: optimizer.finishing().clone(),
627                                    plan_insights_optimizer_trace: None,
628                                    global_lir_plan,
629                                    optimization_finished_at,
630                                    insights_ctx,
631                                }),
632                                ExplainContext::Pushdown => {
633                                    let (plan, _, _) = global_lir_plan.unapply();
634                                    let imports = match plan {
635                                        PeekPlan::SlowPath(plan) => plan
636                                            .desc
637                                            .source_imports
638                                            .into_iter()
639                                            .filter_map(|(id, import)| {
640                                                import.desc.arguments.operators.map(|mfp| (id, mfp))
641                                            })
642                                            .collect(),
643                                        PeekPlan::FastPath(_) => BTreeMap::default(),
644                                    };
645                                    PeekStage::ExplainPushdown(PeekStageExplainPushdown {
646                                        validity,
647                                        determination,
648                                        imports,
649                                    })
650                                }
651                            }
652                        }
653                        Ok(optimize::PeekGlobalLirPlan::CopyTo(global_lir_plan)) => {
654                            let optimizer = optimizer.into_copy_to().expect("a COPY TO optimizer");
655                            PeekStage::CopyToPreflight(PeekStageCopyTo {
656                                validity,
657                                optimizer,
658                                global_lir_plan,
659                                optimization_finished_at,
660                                source_ids,
661                            })
662                        }
663                        // Internal optimizer errors are handled differently
664                        // depending on the caller.
665                        Err(err) => {
666                            let Some(optimizer) = optimizer.into_select() else {
667                                // In `COPY TO` contexts, immediately retire the
668                                // execution with the error.
669                                return Err(err);
670                            };
671                            let ExplainContext::Plan(explain_ctx) = explain_ctx else {
672                                // In `sequence_~` contexts, immediately retire the
673                                // execution with the error.
674                                return Err(err);
675                            };
676
677                            if explain_ctx.broken {
678                                // In `EXPLAIN BROKEN` contexts, just log the error
679                                // and move to the next stage with default
680                                // parameters.
681                                tracing::error!("error while handling EXPLAIN statement: {}", err);
682                                PeekStage::ExplainPlan(PeekStageExplainPlan {
683                                    validity,
684                                    optimizer,
685                                    df_meta: Default::default(),
686                                    explain_ctx,
687                                    insights_ctx: None,
688                                })
689                            } else {
690                                // In regular `EXPLAIN` contexts, immediately retire
691                                // the execution with the error.
692                                return Err(err);
693                            }
694                        }
695                    };
696                    Ok(Box::new(stage))
697                })
698            },
699        )))
700    }
701
702    #[instrument]
703    async fn peek_real_time_recency(
704        &self,
705        session: &Session,
706        PeekStageRealTimeRecency {
707            validity,
708            plan,
709            max_query_result_size,
710            source_ids,
711            target_replica,
712            timeline_context,
713            oracle_read_ts,
714            optimizer,
715            explain_ctx,
716        }: PeekStageRealTimeRecency,
717    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
718        let fut = self
719            .determine_real_time_recent_timestamp_if_needed(session, source_ids.iter().copied())
720            .await?;
721
722        match fut {
723            Some(fut) => {
724                let catalog = Arc::clone(&self.catalog);
725                let span = Span::current();
726                Ok(StageResult::Handle(mz_ore::task::spawn(
727                    || "peek real time recency",
728                    async move {
729                        let real_time_recency_ts =
730                            Coordinator::await_real_time_recent_timestamp(catalog, fut).await?;
731                        let stage = PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
732                            validity,
733                            plan,
734                            max_query_result_size,
735                            target_replica,
736                            timeline_context,
737                            source_ids,
738                            optimizer,
739                            explain_ctx,
740                            oracle_read_ts,
741                            real_time_recency_ts: Some(real_time_recency_ts),
742                        });
743                        Ok(Box::new(stage))
744                    }
745                    .instrument(span),
746                )))
747            }
748            None => Ok(StageResult::Immediate(Box::new(
749                PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
750                    validity,
751                    plan,
752                    max_query_result_size,
753                    target_replica,
754                    timeline_context,
755                    source_ids,
756                    optimizer,
757                    explain_ctx,
758                    oracle_read_ts,
759                    real_time_recency_ts: None,
760                }),
761            ))),
762        }
763    }
764
765    #[instrument]
766    async fn peek_finish(
767        &mut self,
768        ctx: &mut ExecuteContext,
769        PeekStageFinish {
770            validity: _,
771            plan,
772            max_query_result_size,
773            id_bundle,
774            target_replica,
775            source_ids,
776            determination,
777            cluster_id,
778            finishing,
779            plan_insights_optimizer_trace,
780            global_lir_plan,
781            optimization_finished_at,
782            insights_ctx,
783        }: PeekStageFinish,
784    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
785        if let Some(id) = ctx.extra.contents() {
786            self.record_statement_lifecycle_event(
787                &id,
788                &StatementLifecycleEvent::OptimizationFinished,
789                optimization_finished_at,
790            );
791        }
792
793        let session = ctx.session_mut();
794        let conn_id = session.conn_id().clone();
795
796        let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
797        let source_arity = typ.arity();
798
799        emit_optimizer_notices(&*self.catalog, &*session, &df_meta.optimizer_notices);
800
801        if let Some(trace) = plan_insights_optimizer_trace {
802            let target_cluster = self.catalog().get_cluster(cluster_id);
803            let features = OptimizerFeatures::from(self.catalog().system_config())
804                .override_from(&target_cluster.config.features())
805                .override_from(&self.cluster_scoped_optimizer_overrides(cluster_id));
806            let insights = trace
807                .into_plan_insights(
808                    &features,
809                    &self.catalog().for_session(session),
810                    Some(plan.finishing),
811                    Some(target_cluster),
812                    df_meta,
813                    insights_ctx,
814                )
815                .await?;
816            session.add_notice(AdapterNotice::PlanInsights(insights));
817        }
818
819        let planned_peek = PlannedPeek {
820            plan: peek_plan,
821            determination: determination.clone(),
822            conn_id: conn_id.clone(),
823            intermediate_result_type: typ,
824            source_arity,
825            source_ids,
826        };
827
828        if let Some(transient_index_id) = match &planned_peek.plan {
829            peek::PeekPlan::FastPath(_) => None,
830            peek::PeekPlan::SlowPath(PeekDataflowPlan { id, .. }) => Some(id),
831        } {
832            if let Some(statement_logging_id) = ctx.extra.contents() {
833                self.set_transient_index_id(statement_logging_id, *transient_index_id);
834            }
835        }
836
837        if let Some(logging_id) = ctx.extra().contents() {
838            let watch_set = WatchSetCreation::new(
839                logging_id,
840                self.catalog.state(),
841                &id_bundle,
842                determination.timestamp_context.timestamp_or_default(),
843            );
844            self.install_peek_watch_sets(conn_id.clone(), watch_set).expect("the old peek sequencing re-verifies the dependencies' existence before installing the new watch sets");
845        }
846
847        let max_result_size = self.catalog().system_config().max_result_size();
848
849        // Implement the peek, and capture the response.
850        let resp = self
851            .implement_peek_plan(
852                ctx.extra_mut(),
853                planned_peek,
854                finishing,
855                cluster_id,
856                target_replica,
857                max_result_size,
858                max_query_result_size,
859            )
860            .await?;
861
862        if ctx.session().vars().emit_timestamp_notice() {
863            let explanation = self.explain_timestamp(
864                ctx.session().conn_id(),
865                ctx.session().pcx().wall_time,
866                cluster_id,
867                &id_bundle,
868                determination,
869            );
870            ctx.session()
871                .add_notice(AdapterNotice::QueryTimestamp { explanation });
872        }
873
874        let resp = match plan.copy_to {
875            None => resp,
876            Some(format) => ExecuteResponse::CopyTo {
877                format,
878                resp: Box::new(resp),
879            },
880        };
881        Ok(StageResult::Response(resp))
882    }
883
884    #[instrument]
885    async fn peek_copy_to_preflight(
886        &self,
887        copy_to: PeekStageCopyTo,
888    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
889        let connection_context = self.connection_context().clone();
890        let enforce_external_addresses = mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
891            .get(self.controller.storage.config().config_set());
892        Ok(StageResult::Handle(mz_ore::task::spawn(
893            || "peek copy to preflight",
894            async move {
895                let sinks = &copy_to.global_lir_plan.df_desc().sink_exports;
896                if sinks.len() != 1 {
897                    return Err(AdapterError::Internal(
898                        "expected exactly one copy to s3 sink".into(),
899                    ));
900                }
901                let (sink_id, sink_desc) = sinks
902                    .first_key_value()
903                    .expect("known to be exactly one copy to s3 sink");
904                match &sink_desc.connection {
905                    ComputeSinkConnection::CopyToS3Oneshot(conn) => {
906                        mz_storage_types::sinks::s3_oneshot_sink::preflight(
907                            connection_context,
908                            &conn.aws_connection,
909                            &conn.upload_info,
910                            conn.connection_id,
911                            *sink_id,
912                            enforce_external_addresses,
913                        )
914                        .await?;
915                        Ok(Box::new(PeekStage::CopyToDataflow(copy_to)))
916                    }
917                    _ => Err(AdapterError::Internal(
918                        "expected copy to s3 oneshot sink".into(),
919                    )),
920                }
921            },
922        )))
923    }
924
925    #[instrument]
926    async fn peek_copy_to_dataflow(
927        &mut self,
928        ctx: &ExecuteContext,
929        PeekStageCopyTo {
930            validity: _,
931            optimizer,
932            global_lir_plan,
933            optimization_finished_at,
934            source_ids,
935        }: PeekStageCopyTo,
936    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
937        if let Some(id) = ctx.extra.contents() {
938            self.record_statement_lifecycle_event(
939                &id,
940                &StatementLifecycleEvent::OptimizationFinished,
941                optimization_finished_at,
942            );
943        }
944
945        let sink_id = global_lir_plan.sink_id();
946        let cluster_id = optimizer.cluster_id();
947
948        let (df_desc, df_meta) = global_lir_plan.unapply();
949
950        emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
951
952        // Callback for the active copy to.
953        let (tx, rx) = oneshot::channel();
954        let active_copy_to = ActiveCopyTo {
955            conn_id: ctx.session().conn_id().clone(),
956            tx,
957            cluster_id,
958            depends_on: source_ids,
959        };
960        // Add metadata for the new COPY TO. CopyTo returns a `ready` future, so it is safe to drop.
961        drop(
962            self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
963                .await,
964        );
965
966        // Ship dataflow.
967        self.ship_dataflow(df_desc, cluster_id, None).await;
968
969        let span = Span::current();
970        Ok(StageResult::HandleRetire(mz_ore::task::spawn(
971            || "peek copy to dataflow",
972            async {
973                let res = rx.await;
974                match res {
975                    Ok(res) => res,
976                    Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
977                }
978            }
979            .instrument(span),
980        )))
981    }
982
983    #[instrument]
984    async fn peek_explain_plan(
985        &self,
986        session: &Session,
987        PeekStageExplainPlan {
988            optimizer,
989            insights_ctx,
990            df_meta,
991            explain_ctx,
992            ..
993        }: PeekStageExplainPlan,
994    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
995        let rows = super::super::explain_plan_inner(
996            session,
997            self.catalog(),
998            df_meta,
999            explain_ctx,
1000            optimizer,
1001            insights_ctx,
1002        )
1003        .await?;
1004
1005        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
1006    }
1007
1008    #[instrument]
1009    async fn peek_explain_pushdown(
1010        &self,
1011        session: &Session,
1012        stage: PeekStageExplainPushdown,
1013    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1014        let as_of = stage.determination.timestamp_context.antichain();
1015        let mz_now = stage
1016            .determination
1017            .timestamp_context
1018            .timestamp()
1019            .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1020            .unwrap_or_else(ResultSpec::value_all);
1021        let fut = self
1022            .explain_pushdown_future(session, as_of, mz_now, stage.imports)
1023            .await;
1024        let span = Span::current();
1025        Ok(StageResult::HandleRetire(mz_ore::task::spawn(
1026            || "peek explain pushdown",
1027            fut.instrument(span),
1028        )))
1029    }
1030
1031    /// Determines the query timestamp and acquires read holds on dependent sources
1032    /// if necessary.
1033    #[instrument]
1034    pub(super) fn sequence_peek_timestamp(
1035        &mut self,
1036        session: &mut Session,
1037        when: &QueryWhen,
1038        cluster_id: ClusterId,
1039        timeline_context: TimelineContext,
1040        oracle_read_ts: Option<Timestamp>,
1041        source_bundle: &CollectionIdBundle,
1042        source_ids: &BTreeSet<GlobalId>,
1043        real_time_recency_ts: Option<Timestamp>,
1044        requires_linearization: RequireLinearization,
1045    ) -> Result<TimestampDetermination, AdapterError> {
1046        let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when);
1047        let timedomain_bundle;
1048
1049        // Fetch or generate a timestamp for this query and what the read holds would be if we need to set
1050        // them.
1051        let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
1052            // Use the transaction's timestamp if it exists and this isn't an AS OF query.
1053            Some(
1054                determination @ TimestampDetermination {
1055                    timestamp_context: TimestampContext::TimelineTimestamp { .. },
1056                    ..
1057                },
1058            ) if in_immediate_multi_stmt_txn => (determination, None),
1059            _ => {
1060                let determine_bundle = if in_immediate_multi_stmt_txn {
1061                    // In a transaction, determine a timestamp that will be valid for anything in
1062                    // any schema referenced by the first query.
1063                    timedomain_bundle = timedomain_for(
1064                        self.catalog(),
1065                        &self.index_oracle(cluster_id),
1066                        source_ids,
1067                        &timeline_context,
1068                        session.conn_id(),
1069                        cluster_id,
1070                    )?;
1071
1072                    &timedomain_bundle
1073                } else {
1074                    // If not in a transaction, use the source.
1075                    source_bundle
1076                };
1077                let (determination, read_holds) = self.determine_timestamp(
1078                    session,
1079                    determine_bundle,
1080                    when,
1081                    cluster_id,
1082                    &timeline_context,
1083                    oracle_read_ts,
1084                    real_time_recency_ts,
1085                )?;
1086                // We only need read holds if the read depends on a timestamp.
1087                let read_holds = match determination.timestamp_context.timestamp() {
1088                    Some(_ts) => Some(read_holds),
1089                    None => {
1090                        // We don't need the read holds and shouldn't add them
1091                        // to the txn.
1092                        //
1093                        // TODO: Handle this within determine_timestamp.
1094                        drop(read_holds);
1095                        None
1096                    }
1097                };
1098                (determination, read_holds)
1099            }
1100        };
1101
1102        // Always either verify the current statement ids are within the existing
1103        // transaction's read hold set (timedomain), or create the read holds if this is the
1104        // first statement in a transaction (or this is a single statement transaction).
1105        // This must happen even if this is an `AS OF` query as well. There are steps after
1106        // this that happen off thread, so no matter the kind of statement or transaction,
1107        // we must acquire read holds here so they are held until the off-thread work
1108        // returns to the coordinator.
1109
1110        if let Some(txn_reads) = self.txn_read_holds.get(session.conn_id()) {
1111            // Find referenced ids not in the read hold. A reference could be caused by a
1112            // user specifying an object in a different schema than the first query. An
1113            // index could be caused by a CREATE INDEX after the transaction started.
1114            let allowed_id_bundle = txn_reads.id_bundle();
1115
1116            // We don't need the read holds that determine_timestamp acquired
1117            // for us.
1118            drop(read_holds);
1119
1120            let outside = source_bundle.difference(&allowed_id_bundle);
1121            // Queries without a timestamp and timeline can belong to any existing timedomain.
1122            if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
1123                let valid_names =
1124                    allowed_id_bundle.resolve_names(self.catalog(), session.conn_id());
1125                let invalid_names = outside.resolve_names(self.catalog(), session.conn_id());
1126                return Err(AdapterError::RelationOutsideTimeDomain {
1127                    relations: invalid_names,
1128                    names: valid_names,
1129                });
1130            }
1131        } else if let Some(read_holds) = read_holds {
1132            self.store_transaction_read_holds(session.conn_id().clone(), read_holds);
1133        }
1134
1135        // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
1136        // arbitrary and we don't recall why we did it (possibly an error!). Change this to always
1137        // set the transaction ops. Decide and document what our policy should be on AS OF queries.
1138        // Maybe they shouldn't be allowed in transactions at all because it's hard to explain
1139        // what's going on there. This should probably get a small design document.
1140
1141        // We only track the peeks in the session if the query doesn't use AS
1142        // OF or we're inside an explicit transaction. The latter case is
1143        // necessary to support PG's `BEGIN` semantics, whose behavior can
1144        // depend on whether or not reads have occurred in the txn.
1145        let mut transaction_determination = determination.clone();
1146        if when.is_transactional() {
1147            session.add_transaction_ops(TransactionOps::Peeks {
1148                determination: transaction_determination,
1149                cluster_id,
1150                requires_linearization,
1151            })?;
1152        } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
1153            // If the query uses AS OF, then ignore the timestamp.
1154            transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
1155            session.add_transaction_ops(TransactionOps::Peeks {
1156                determination: transaction_determination,
1157                cluster_id,
1158                requires_linearization,
1159            })?;
1160        };
1161
1162        Ok(determination)
1163    }
1164}