mz_adapter/coord/sequencer/inner/
peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::str::FromStr;
12use std::sync::Arc;
13
14use http::Uri;
15use itertools::Either;
16use maplit::btreemap;
17use mz_adapter_types::dyncfgs::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION;
18use mz_catalog::memory::objects::CatalogItem;
19use mz_compute_types::sinks::ComputeSinkConnection;
20use mz_controller_types::ClusterId;
21use mz_expr::{CollectionPlan, ResultSpec};
22use mz_ore::cast::CastFrom;
23use mz_ore::instrument;
24use mz_repr::explain::{ExprHumanizerExt, TransientItem};
25use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
26use mz_repr::{Datum, GlobalId, RowArena, Timestamp};
27use mz_sql::ast::{ExplainStage, Statement};
28use mz_sql::catalog::CatalogCluster;
29// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
30use mz_sql::plan::QueryWhen;
31use mz_sql::plan::{self, HirScalarExpr};
32use mz_sql::session::metadata::SessionMetadata;
33use mz_transform::EmptyStatisticsOracle;
34use tokio::sync::oneshot;
35use tracing::warn;
36use tracing::{Instrument, Span};
37
38use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
39use crate::command::ExecuteResponse;
40use crate::coord::id_bundle::CollectionIdBundle;
41use crate::coord::peek::{self, PeekDataflowPlan, PeekPlan, PlannedPeek};
42use crate::coord::sequencer::inner::{check_log_reads, return_if_err};
43use crate::coord::timeline::TimelineContext;
44use crate::coord::timestamp_selection::{
45    TimestampContext, TimestampDetermination, TimestampProvider,
46};
47use crate::coord::{
48    Coordinator, CopyToContext, ExecuteContext, ExplainContext, ExplainPlanContext, Message,
49    PeekStage, PeekStageCopyTo, PeekStageExplainPlan, PeekStageExplainPushdown, PeekStageFinish,
50    PeekStageLinearizeTimestamp, PeekStageOptimize, PeekStageRealTimeRecency,
51    PeekStageTimestampReadHold, PlanValidity, StageResult, Staged, TargetCluster, WatchSetResponse,
52};
53use crate::error::AdapterError;
54use crate::explain::insights::PlanInsightsContext;
55use crate::explain::optimizer_trace::OptimizerTrace;
56use crate::notice::AdapterNotice;
57use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
58use crate::optimize::{self, Optimize};
59use crate::session::{RequireLinearization, Session, TransactionOps, TransactionStatus};
60use crate::statement_logging::StatementLifecycleEvent;
61
62impl Staged for PeekStage {
63    type Ctx = ExecuteContext;
64
65    fn validity(&mut self) -> &mut PlanValidity {
66        match self {
67            PeekStage::LinearizeTimestamp(stage) => &mut stage.validity,
68            PeekStage::RealTimeRecency(stage) => &mut stage.validity,
69            PeekStage::TimestampReadHold(stage) => &mut stage.validity,
70            PeekStage::Optimize(stage) => &mut stage.validity,
71            PeekStage::Finish(stage) => &mut stage.validity,
72            PeekStage::ExplainPlan(stage) => &mut stage.validity,
73            PeekStage::ExplainPushdown(stage) => &mut stage.validity,
74            PeekStage::CopyToPreflight(stage) => &mut stage.validity,
75            PeekStage::CopyToDataflow(stage) => &mut stage.validity,
76        }
77    }
78
79    async fn stage(
80        self,
81        coord: &mut Coordinator,
82        ctx: &mut ExecuteContext,
83    ) -> Result<StageResult<Box<Self>>, AdapterError> {
84        match self {
85            PeekStage::LinearizeTimestamp(stage) => {
86                coord.peek_linearize_timestamp(ctx.session(), stage).await
87            }
88            PeekStage::RealTimeRecency(stage) => {
89                coord.peek_real_time_recency(ctx.session(), stage).await
90            }
91            PeekStage::TimestampReadHold(stage) => {
92                coord.peek_timestamp_read_hold(ctx.session_mut(), stage)
93            }
94            PeekStage::Optimize(stage) => coord.peek_optimize(ctx.session(), stage).await,
95            PeekStage::Finish(stage) => coord.peek_finish(ctx, stage).await,
96            PeekStage::ExplainPlan(stage) => coord.peek_explain_plan(ctx.session(), stage).await,
97            PeekStage::ExplainPushdown(stage) => {
98                coord.peek_explain_pushdown(ctx.session(), stage).await
99            }
100            PeekStage::CopyToPreflight(stage) => coord.peek_copy_to_preflight(stage).await,
101            PeekStage::CopyToDataflow(stage) => coord.peek_copy_to_dataflow(ctx, stage).await,
102        }
103    }
104
105    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
106        Message::PeekStageReady {
107            ctx,
108            span,
109            stage: self,
110        }
111    }
112
113    fn cancel_enabled(&self) -> bool {
114        true
115    }
116}
117
118impl Coordinator {
119    /// Sequence a peek, determining a timestamp and the most efficient dataflow interaction.
120    ///
121    /// Peeks are sequenced by assigning a timestamp for evaluation, and then determining and
122    /// deploying the most efficient evaluation plan. The peek could evaluate to a constant,
123    /// be a simple read out of an existing arrangement, or required a new dataflow to build
124    /// the results to return.
125    #[instrument]
126    pub(crate) async fn sequence_peek(
127        &mut self,
128        ctx: ExecuteContext,
129        plan: plan::SelectPlan,
130        target_cluster: TargetCluster,
131        max_query_result_size: Option<u64>,
132    ) {
133        let explain_ctx = if ctx.session().vars().emit_plan_insights_notice() {
134            let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
135            ExplainContext::PlanInsightsNotice(optimizer_trace)
136        } else {
137            ExplainContext::None
138        };
139
140        let stage = return_if_err!(
141            self.peek_validate(
142                ctx.session(),
143                plan,
144                target_cluster,
145                None,
146                explain_ctx,
147                max_query_result_size
148            ),
149            ctx
150        );
151        self.sequence_staged(ctx, Span::current(), stage).await;
152    }
153
154    #[instrument]
155    pub(crate) async fn sequence_copy_to(
156        &mut self,
157        ctx: ExecuteContext,
158        plan::CopyToPlan {
159            select_plan,
160            desc,
161            to,
162            connection,
163            connection_id,
164            format,
165            max_file_size,
166        }: plan::CopyToPlan,
167        target_cluster: TargetCluster,
168    ) {
169        let eval_uri = |to: HirScalarExpr| -> Result<Uri, AdapterError> {
170            let style = ExprPrepStyle::OneShot {
171                logical_time: EvalTime::NotAvailable,
172                session: ctx.session(),
173                catalog_state: self.catalog().state(),
174            };
175            let mut to = to.lower_uncorrelated()?;
176            prep_scalar_expr(&mut to, style)?;
177            let temp_storage = RowArena::new();
178            let evaled = to.eval(&[], &temp_storage)?;
179            if evaled == Datum::Null {
180                coord_bail!("COPY TO target value can not be null");
181            }
182            let to_url = match Uri::from_str(evaled.unwrap_str()) {
183                Ok(url) => {
184                    if url.scheme_str() != Some("s3") {
185                        coord_bail!("only 's3://...' urls are supported as COPY TO target");
186                    }
187                    url
188                }
189                Err(e) => coord_bail!("could not parse COPY TO target url: {}", e),
190            };
191            Ok(to_url)
192        };
193
194        let uri = return_if_err!(eval_uri(to), ctx);
195
196        let stage = return_if_err!(
197            self.peek_validate(
198                ctx.session(),
199                select_plan,
200                target_cluster,
201                Some(CopyToContext {
202                    desc,
203                    uri,
204                    connection,
205                    connection_id,
206                    format,
207                    max_file_size,
208                    // This will be set in `peek_stage_validate` stage below.
209                    output_batch_count: None,
210                }),
211                ExplainContext::None,
212                Some(ctx.session().vars().max_query_result_size()),
213            ),
214            ctx
215        );
216        self.sequence_staged(ctx, Span::current(), stage).await;
217    }
218
219    #[instrument]
220    pub(crate) async fn explain_peek(
221        &mut self,
222        ctx: ExecuteContext,
223        plan::ExplainPlanPlan {
224            stage,
225            format,
226            config,
227            explainee,
228        }: plan::ExplainPlanPlan,
229        target_cluster: TargetCluster,
230    ) {
231        let plan::Explainee::Statement(stmt) = explainee else {
232            // This is currently asserted in the `sequence_explain_plan` code that
233            // calls this method.
234            unreachable!()
235        };
236        let plan::ExplaineeStatement::Select { broken, plan, desc } = stmt else {
237            // This is currently asserted in the `sequence_explain_plan` code that
238            // calls this method.
239            unreachable!()
240        };
241
242        // Create an OptimizerTrace instance to collect plans emitted when
243        // executing the optimizer pipeline.
244        let optimizer_trace = OptimizerTrace::new(stage.paths());
245
246        let stage = return_if_err!(
247            self.peek_validate(
248                ctx.session(),
249                plan,
250                target_cluster,
251                None,
252                ExplainContext::Plan(ExplainPlanContext {
253                    broken,
254                    config,
255                    format,
256                    stage,
257                    replan: None,
258                    desc: Some(desc),
259                    optimizer_trace,
260                }),
261                Some(ctx.session().vars().max_query_result_size()),
262            ),
263            ctx
264        );
265        self.sequence_staged(ctx, Span::current(), stage).await;
266    }
267
268    /// Do some simple validation. We must defer most of it until after any off-thread work.
269    #[instrument]
270    pub fn peek_validate(
271        &self,
272        session: &Session,
273        plan: mz_sql::plan::SelectPlan,
274        target_cluster: TargetCluster,
275        copy_to_ctx: Option<CopyToContext>,
276        explain_ctx: ExplainContext,
277        max_query_result_size: Option<u64>,
278    ) -> Result<PeekStage, AdapterError> {
279        // Collect optimizer parameters.
280        let catalog = self.owned_catalog();
281        let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
282        let compute_instance = self
283            .instance_snapshot(cluster.id())
284            .expect("compute instance does not exist");
285        let (_, view_id) = self.allocate_transient_id();
286        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
287            .override_from(&self.catalog.get_cluster(cluster.id()).config.features())
288            .override_from(&explain_ctx);
289
290        if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
291            return Err(AdapterError::NoClusterReplicasAvailable {
292                name: cluster.name.clone(),
293                is_managed: cluster.is_managed(),
294            });
295        }
296
297        let optimizer = match copy_to_ctx {
298            None => {
299                // Collect optimizer parameters specific to the peek::Optimizer.
300                let (_, view_id) = self.allocate_transient_id();
301                let (_, index_id) = self.allocate_transient_id();
302
303                // Build an optimizer for this SELECT.
304                Either::Left(optimize::peek::Optimizer::new(
305                    Arc::clone(&catalog),
306                    compute_instance,
307                    plan.finishing.clone(),
308                    view_id,
309                    index_id,
310                    optimizer_config,
311                    self.optimizer_metrics(),
312                ))
313            }
314            Some(mut copy_to_ctx) => {
315                // Getting the max worker count across replicas
316                // and using that value for the number of batches to
317                // divide the copy output into.
318                let worker_counts = cluster.replicas().map(|r| {
319                    let loc = &r.config.location;
320                    loc.workers().unwrap_or_else(|| loc.num_processes())
321                });
322                let max_worker_count = match worker_counts.max() {
323                    Some(count) => u64::cast_from(count),
324                    None => {
325                        return Err(AdapterError::NoClusterReplicasAvailable {
326                            name: cluster.name.clone(),
327                            is_managed: cluster.is_managed(),
328                        });
329                    }
330                };
331                copy_to_ctx.output_batch_count = Some(max_worker_count);
332                // Build an optimizer for this COPY TO.
333                Either::Right(optimize::copy_to::Optimizer::new(
334                    Arc::clone(&catalog),
335                    compute_instance,
336                    view_id,
337                    copy_to_ctx,
338                    optimizer_config,
339                    self.optimizer_metrics(),
340                ))
341            }
342        };
343
344        let target_replica_name = session.vars().cluster_replica();
345        let mut target_replica = target_replica_name
346            .map(|name| {
347                cluster
348                    .replica_id(name)
349                    .ok_or(AdapterError::UnknownClusterReplica {
350                        cluster_name: cluster.name.clone(),
351                        replica_name: name.to_string(),
352                    })
353            })
354            .transpose()?;
355
356        let source_ids = plan.source.depends_on();
357        let mut timeline_context = self
358            .catalog()
359            .validate_timeline_context(source_ids.iter().copied())?;
360        if matches!(timeline_context, TimelineContext::TimestampIndependent)
361            && plan.source.contains_temporal()?
362        {
363            // If the source IDs are timestamp independent but the query contains temporal functions,
364            // then the timeline context needs to be upgraded to timestamp dependent. This is
365            // required because `source_ids` doesn't contain functions.
366            timeline_context = TimelineContext::TimestampDependent;
367        }
368
369        let notices = check_log_reads(
370            &catalog,
371            cluster,
372            &source_ids,
373            &mut target_replica,
374            session.vars(),
375        )?;
376        session.add_notices(notices);
377
378        let dependencies = source_ids
379            .iter()
380            .map(|id| self.catalog.resolve_item_id(id))
381            .collect();
382        let validity = PlanValidity::new(
383            catalog.transient_revision(),
384            dependencies,
385            Some(cluster.id()),
386            target_replica,
387            session.role_metadata().clone(),
388        );
389
390        Ok(PeekStage::LinearizeTimestamp(PeekStageLinearizeTimestamp {
391            validity,
392            plan,
393            max_query_result_size,
394            source_ids,
395            target_replica,
396            timeline_context,
397            optimizer,
398            explain_ctx,
399        }))
400    }
401
402    /// Possibly linearize a timestamp from a `TimestampOracle`.
403    #[instrument]
404    async fn peek_linearize_timestamp(
405        &self,
406        session: &Session,
407        PeekStageLinearizeTimestamp {
408            validity,
409            source_ids,
410            plan,
411            max_query_result_size,
412            target_replica,
413            timeline_context,
414            optimizer,
415            explain_ctx,
416        }: PeekStageLinearizeTimestamp,
417    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
418        let isolation_level = session.vars().transaction_isolation().clone();
419        let timeline = Coordinator::get_timeline(&timeline_context);
420        let needs_linearized_read_ts =
421            Coordinator::needs_linearized_read_ts(&isolation_level, &plan.when);
422
423        let build_stage = move |oracle_read_ts: Option<Timestamp>| PeekStageRealTimeRecency {
424            validity,
425            plan,
426            max_query_result_size,
427            source_ids,
428            target_replica,
429            timeline_context,
430            oracle_read_ts,
431            optimizer,
432            explain_ctx,
433        };
434
435        match timeline {
436            Some(timeline) if needs_linearized_read_ts => {
437                let oracle = self.get_timestamp_oracle(&timeline);
438
439                // We ship the timestamp oracle off to an async task, so that we
440                // don't block the main task while we wait.
441
442                let span = Span::current();
443                Ok(StageResult::Handle(mz_ore::task::spawn(
444                    || "linearize timestamp",
445                    async move {
446                        let oracle_read_ts = oracle.read_ts().await;
447                        let stage = build_stage(Some(oracle_read_ts));
448                        let stage = PeekStage::RealTimeRecency(stage);
449                        Ok(Box::new(stage))
450                    }
451                    .instrument(span),
452                )))
453            }
454            Some(_) | None => {
455                let stage = build_stage(None);
456                let stage = PeekStage::RealTimeRecency(stage);
457                Ok(StageResult::Immediate(Box::new(stage)))
458            }
459        }
460    }
461
462    /// Determine a read timestamp and create appropriate read holds.
463    #[instrument]
464    fn peek_timestamp_read_hold(
465        &mut self,
466        session: &mut Session,
467        PeekStageTimestampReadHold {
468            mut validity,
469            plan,
470            max_query_result_size,
471            source_ids,
472            target_replica,
473            timeline_context,
474            oracle_read_ts,
475            real_time_recency_ts,
476            optimizer,
477            explain_ctx,
478        }: PeekStageTimestampReadHold,
479    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
480        let cluster_id = match optimizer.as_ref() {
481            Either::Left(optimizer) => optimizer.cluster_id(),
482            Either::Right(optimizer) => optimizer.cluster_id(),
483        };
484        let id_bundle = self
485            .dataflow_builder(cluster_id)
486            .sufficient_collections(source_ids.iter().copied());
487
488        // Although we have added `sources.depends_on()` to the validity already, also add the
489        // sufficient collections for safety.
490        let item_ids = id_bundle
491            .iter()
492            .map(|id| self.catalog().resolve_item_id(&id));
493        validity.extend_dependencies(item_ids);
494
495        let determination = self.sequence_peek_timestamp(
496            session,
497            &plan.when,
498            cluster_id,
499            timeline_context,
500            oracle_read_ts,
501            &id_bundle,
502            &source_ids,
503            real_time_recency_ts,
504            (&explain_ctx).into(),
505        )?;
506
507        let stage = PeekStage::Optimize(PeekStageOptimize {
508            validity,
509            plan,
510            max_query_result_size,
511            source_ids,
512            id_bundle,
513            target_replica,
514            determination,
515            optimizer,
516            explain_ctx,
517        });
518        Ok(StageResult::Immediate(Box::new(stage)))
519    }
520
521    #[instrument]
522    async fn peek_optimize(
523        &self,
524        session: &Session,
525        PeekStageOptimize {
526            validity,
527            plan,
528            max_query_result_size,
529            source_ids,
530            id_bundle,
531            target_replica,
532            determination,
533            mut optimizer,
534            explain_ctx,
535        }: PeekStageOptimize,
536    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
537        // Generate data structures that can be moved to another task where we will perform possibly
538        // expensive optimizations.
539        let timestamp_context = determination.timestamp_context.clone();
540        let stats = self
541            .statistics_oracle(session, &source_ids, &timestamp_context.antichain(), true)
542            .await
543            .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
544        let session = session.meta();
545        let now = self.catalog().config().now.clone();
546        let catalog = self.owned_catalog();
547        let mut compute_instances = BTreeMap::new();
548        if explain_ctx.needs_plan_insights() {
549            // There's a chance for index skew (indexes were created/deleted between stages) from the
550            // original plan, but that seems acceptable for insights.
551            for cluster in self.catalog().user_clusters() {
552                let snapshot = self.instance_snapshot(cluster.id).expect("must exist");
553                compute_instances.insert(cluster.name.clone(), snapshot);
554            }
555        }
556
557        let span = Span::current();
558        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
559            || "optimize peek",
560            move || {
561                span.in_scope(|| {
562                    let pipeline = || -> Result<Either<optimize::peek::GlobalLirPlan, optimize::copy_to::GlobalLirPlan>, AdapterError> {
563                        let _dispatch_guard = explain_ctx.dispatch_guard();
564
565                        let raw_expr = plan.source.clone();
566
567                        match optimizer.as_mut() {
568                            // Optimize SELECT statement.
569                            Either::Left(optimizer) => {
570                                // HIR ⇒ MIR lowering and MIR optimization (local)
571                                let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
572                                // Attach resolved context required to continue the pipeline.
573                                let local_mir_plan = local_mir_plan.resolve(timestamp_context.clone(), &session, stats);
574                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
575                                let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
576
577                                Ok(Either::Left(global_lir_plan))
578                            }
579                            // Optimize COPY TO statement.
580                            Either::Right(optimizer) => {
581                                // HIR ⇒ MIR lowering and MIR optimization (local)
582                                let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
583                                // Attach resolved context required to continue the pipeline.
584                                let local_mir_plan = local_mir_plan.resolve(timestamp_context.clone(), &session, stats);
585                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
586                                let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
587
588                                Ok(Either::Right(global_lir_plan))
589                            }
590                        }
591                    };
592
593                    let pipeline_result = pipeline();
594                    let optimization_finished_at = now();
595
596                    let stage = match pipeline_result {
597                        Ok(Either::Left(global_lir_plan)) => {
598                            let optimizer = optimizer.unwrap_left();
599                            // Enable fast path cluster calculation for slow path plans.
600                            let needs_plan_insights = explain_ctx.needs_plan_insights();
601                            // Disable anything that uses the optimizer if we only want the notice and
602                            // plan optimization took longer than the threshold. This is to prevent a
603                            // situation where optimizing takes a while and there a lots of clusters,
604                            // which would delay peek execution by the product of those.
605                            let opt_limit = PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
606                                .get(catalog.system_config().dyncfgs());
607                            let target_instance = catalog
608                                .get_cluster(optimizer.cluster_id())
609                                .name
610                                .clone();
611                            let enable_re_optimize =
612                                !(matches!(explain_ctx, ExplainContext::PlanInsightsNotice(_))
613                                    && optimizer.duration() > opt_limit);
614                            let insights_ctx = needs_plan_insights.then(|| PlanInsightsContext {
615                                stmt: plan.select.as_deref().map(Clone::clone).map(Statement::Select),
616                                raw_expr: plan.source.clone(),
617                                catalog,
618                                compute_instances,
619                                target_instance,
620                                metrics: optimizer.metrics().clone(),
621                                finishing: optimizer.finishing().clone(),
622                                optimizer_config: optimizer.config().clone(),
623                                session,
624                                timestamp_context,
625                                view_id: optimizer.select_id(),
626                                index_id: optimizer.index_id(),
627                                enable_re_optimize,
628                            }).map(Box::new);
629                            match explain_ctx {
630                                ExplainContext::Plan(explain_ctx) => {
631                                    let (_, df_meta, _) = global_lir_plan.unapply();
632                                    PeekStage::ExplainPlan(PeekStageExplainPlan {
633                                        validity,
634                                        optimizer,
635                                        df_meta,
636                                        explain_ctx,
637                                        insights_ctx,
638                                    })
639                                }
640                                ExplainContext::PlanInsightsNotice(optimizer_trace) => {
641                                    PeekStage::Finish(PeekStageFinish {
642                                        validity,
643                                        plan,
644                                        max_query_result_size,
645                                        id_bundle,
646                                        target_replica,
647                                        source_ids,
648                                        determination,
649                                        cluster_id: optimizer.cluster_id(),
650                                        finishing: optimizer.finishing().clone(),
651                                        plan_insights_optimizer_trace: Some(optimizer_trace),
652                                        global_lir_plan,
653                                        optimization_finished_at,
654                                        insights_ctx,
655                                    })
656                                }
657                                ExplainContext::None => PeekStage::Finish(PeekStageFinish {
658                                    validity,
659                                    plan,
660                                    max_query_result_size,
661                                    id_bundle,
662                                    target_replica,
663                                    source_ids,
664                                    determination,
665                                    cluster_id: optimizer.cluster_id(),
666                                    finishing: optimizer.finishing().clone(),
667                                    plan_insights_optimizer_trace: None,
668                                    global_lir_plan,
669                                    optimization_finished_at,
670                                    insights_ctx,
671                                }),
672                                ExplainContext::Pushdown => {
673                                    let (plan, _, _) = global_lir_plan.unapply();
674                                    let imports = match plan {
675                                        PeekPlan::SlowPath(plan) => plan
676                                            .desc
677                                            .source_imports
678                                            .into_iter()
679                                            .filter_map(|(id, (desc, _, _upper))| {
680                                                desc.arguments.operators.map(|mfp| (id, mfp))
681                                            })
682                                            .collect(),
683                                        PeekPlan::FastPath(_) => BTreeMap::default(),
684                                    };
685                                    PeekStage::ExplainPushdown(PeekStageExplainPushdown {
686                                        validity,
687                                        determination,
688                                        imports,
689                                    })
690                                }
691                            }
692                        }
693                        Ok(Either::Right(global_lir_plan)) => {
694                            let optimizer = optimizer.unwrap_right();
695                            PeekStage::CopyToPreflight(PeekStageCopyTo {
696                                validity,
697                                optimizer,
698                                global_lir_plan,
699                                optimization_finished_at,
700                                source_ids,
701                            })
702                        }
703                        // Internal optimizer errors are handled differently
704                        // depending on the caller.
705                        Err(err) => {
706                            let Some(optimizer) = optimizer.left() else {
707                                // In `COPY TO` contexts, immediately retire the
708                                // execution with the error.
709                                return Err(err);
710                            };
711                            let ExplainContext::Plan(explain_ctx) = explain_ctx else {
712                                // In `sequence_~` contexts, immediately retire the
713                                // execution with the error.
714                                return Err(err);
715                            };
716
717                            if explain_ctx.broken {
718                                // In `EXPLAIN BROKEN` contexts, just log the error
719                                // and move to the next stage with default
720                                // parameters.
721                                tracing::error!("error while handling EXPLAIN statement: {}", err);
722                                PeekStage::ExplainPlan(PeekStageExplainPlan {
723                                    validity,
724                                    optimizer,
725                                    df_meta: Default::default(),
726                                    explain_ctx,
727                                    insights_ctx: None,
728                                })
729                            } else {
730                                // In regular `EXPLAIN` contexts, immediately retire
731                                // the execution with the error.
732                                return Err(err);
733                            }
734                        }
735                    };
736                    Ok(Box::new(stage))
737                })
738            },
739        )))
740    }
741
742    #[instrument]
743    async fn peek_real_time_recency(
744        &self,
745        session: &Session,
746        PeekStageRealTimeRecency {
747            validity,
748            plan,
749            max_query_result_size,
750            source_ids,
751            target_replica,
752            timeline_context,
753            oracle_read_ts,
754            optimizer,
755            explain_ctx,
756        }: PeekStageRealTimeRecency,
757    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
758        let item_ids: Vec<_> = source_ids
759            .iter()
760            .map(|gid| self.catalog.resolve_item_id(gid))
761            .collect();
762        let fut = self
763            .determine_real_time_recent_timestamp(session, item_ids.into_iter())
764            .await?;
765
766        match fut {
767            Some(fut) => {
768                let span = Span::current();
769                Ok(StageResult::Handle(mz_ore::task::spawn(
770                    || "peek real time recency",
771                    async move {
772                        let real_time_recency_ts = fut.await?;
773                        let stage = PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
774                            validity,
775                            plan,
776                            max_query_result_size,
777                            target_replica,
778                            timeline_context,
779                            source_ids,
780                            optimizer,
781                            explain_ctx,
782                            oracle_read_ts,
783                            real_time_recency_ts: Some(real_time_recency_ts),
784                        });
785                        Ok(Box::new(stage))
786                    }
787                    .instrument(span),
788                )))
789            }
790            None => Ok(StageResult::Immediate(Box::new(
791                PeekStage::TimestampReadHold(PeekStageTimestampReadHold {
792                    validity,
793                    plan,
794                    max_query_result_size,
795                    target_replica,
796                    timeline_context,
797                    source_ids,
798                    optimizer,
799                    explain_ctx,
800                    oracle_read_ts,
801                    real_time_recency_ts: None,
802                }),
803            ))),
804        }
805    }
806
807    #[instrument]
808    async fn peek_finish(
809        &mut self,
810        ctx: &mut ExecuteContext,
811        PeekStageFinish {
812            validity: _,
813            plan,
814            max_query_result_size,
815            id_bundle,
816            target_replica,
817            source_ids,
818            determination,
819            cluster_id,
820            finishing,
821            plan_insights_optimizer_trace,
822            global_lir_plan,
823            optimization_finished_at,
824            insights_ctx,
825        }: PeekStageFinish,
826    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
827        if let Some(id) = ctx.extra.contents() {
828            self.record_statement_lifecycle_event(
829                &id,
830                &StatementLifecycleEvent::OptimizationFinished,
831                optimization_finished_at,
832            );
833        }
834
835        let session = ctx.session_mut();
836        let conn_id = session.conn_id().clone();
837
838        let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
839        let source_arity = typ.arity();
840
841        self.emit_optimizer_notices(&*session, &df_meta.optimizer_notices);
842
843        let target_cluster = self.catalog().get_cluster(cluster_id);
844
845        let features = OptimizerFeatures::from(self.catalog().system_config())
846            .override_from(&target_cluster.config.features());
847
848        if let Some(trace) = plan_insights_optimizer_trace {
849            let insights = trace
850                .into_plan_insights(
851                    &features,
852                    &self.catalog().for_session(session),
853                    Some(plan.finishing),
854                    Some(target_cluster),
855                    df_meta,
856                    insights_ctx,
857                )
858                .await?;
859            session.add_notice(AdapterNotice::PlanInsights(insights));
860        }
861
862        let planned_peek = PlannedPeek {
863            plan: peek_plan,
864            determination: determination.clone(),
865            conn_id: conn_id.clone(),
866            intermediate_result_type: typ,
867            source_arity,
868            source_ids,
869        };
870
871        if let Some(transient_index_id) = match &planned_peek.plan {
872            peek::PeekPlan::FastPath(_) => None,
873            peek::PeekPlan::SlowPath(PeekDataflowPlan { id, .. }) => Some(id),
874        } {
875            if let Some(statement_logging_id) = ctx.extra.contents() {
876                self.set_transient_index_id(statement_logging_id, *transient_index_id);
877            }
878        }
879
880        if let Some(uuid) = ctx.extra().contents() {
881            let ts = determination.timestamp_context.timestamp_or_default();
882            let mut transitive_storage_deps = BTreeSet::new();
883            let mut transitive_compute_deps = BTreeSet::new();
884            for item_id in id_bundle
885                .iter()
886                .map(|gid| self.catalog.state().get_entry_by_global_id(&gid).id())
887                .flat_map(|id| self.catalog.state().transitive_uses(id))
888            {
889                let entry = self.catalog.state().get_entry(&item_id);
890                match entry.item() {
891                    // TODO(alter_table): Adding all of the GlobalIds for an object is incorrect.
892                    // For example, this peek may depend on just a single version of a table, but
893                    // we would add dependencies on all versions of said table. Doing this is okay
894                    // for now since we can't yet version tables, but should get fixed.
895                    CatalogItem::Table(_) | CatalogItem::Source(_) => {
896                        transitive_storage_deps.extend(entry.global_ids());
897                    }
898                    CatalogItem::MaterializedView(_) | CatalogItem::Index(_) => {
899                        transitive_compute_deps.extend(entry.global_ids());
900                    }
901                    _ => {}
902                }
903            }
904            self.install_storage_watch_set(
905                conn_id.clone(),
906                transitive_storage_deps,
907                ts,
908                WatchSetResponse::StatementDependenciesReady(
909                    uuid,
910                    StatementLifecycleEvent::StorageDependenciesFinished,
911                ),
912            );
913            self.install_compute_watch_set(
914                conn_id,
915                transitive_compute_deps,
916                ts,
917                WatchSetResponse::StatementDependenciesReady(
918                    uuid,
919                    StatementLifecycleEvent::ComputeDependenciesFinished,
920                ),
921            )
922        }
923
924        let max_result_size = self.catalog().system_config().max_result_size();
925
926        // Implement the peek, and capture the response.
927        let resp = self
928            .implement_peek_plan(
929                ctx.extra_mut(),
930                planned_peek,
931                finishing,
932                cluster_id,
933                target_replica,
934                max_result_size,
935                max_query_result_size,
936            )
937            .await?;
938
939        if ctx.session().vars().emit_timestamp_notice() {
940            let explanation =
941                self.explain_timestamp(ctx.session(), cluster_id, &id_bundle, determination);
942            ctx.session()
943                .add_notice(AdapterNotice::QueryTimestamp { explanation });
944        }
945
946        let resp = match plan.copy_to {
947            None => resp,
948            Some(format) => ExecuteResponse::CopyTo {
949                format,
950                resp: Box::new(resp),
951            },
952        };
953        Ok(StageResult::Response(resp))
954    }
955
956    #[instrument]
957    async fn peek_copy_to_preflight(
958        &mut self,
959        copy_to: PeekStageCopyTo,
960    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
961        let connection_context = self.connection_context().clone();
962        Ok(StageResult::Handle(mz_ore::task::spawn(
963            || "peek copy to preflight",
964            async {
965                let sinks = &copy_to.global_lir_plan.df_desc().sink_exports;
966                if sinks.len() != 1 {
967                    return Err(AdapterError::Internal(
968                        "expected exactly one copy to s3 sink".into(),
969                    ));
970                }
971                let (sink_id, sink_desc) = sinks
972                    .first_key_value()
973                    .expect("known to be exactly one copy to s3 sink");
974                match &sink_desc.connection {
975                    ComputeSinkConnection::CopyToS3Oneshot(conn) => {
976                        mz_storage_types::sinks::s3_oneshot_sink::preflight(
977                            connection_context,
978                            &conn.aws_connection,
979                            &conn.upload_info,
980                            conn.connection_id,
981                            *sink_id,
982                        )
983                        .await?;
984                        Ok(Box::new(PeekStage::CopyToDataflow(copy_to)))
985                    }
986                    _ => Err(AdapterError::Internal(
987                        "expected copy to s3 oneshot sink".into(),
988                    )),
989                }
990            },
991        )))
992    }
993
994    #[instrument]
995    async fn peek_copy_to_dataflow(
996        &mut self,
997        ctx: &ExecuteContext,
998        PeekStageCopyTo {
999            validity: _,
1000            optimizer,
1001            global_lir_plan,
1002            optimization_finished_at,
1003            source_ids,
1004        }: PeekStageCopyTo,
1005    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1006        if let Some(id) = ctx.extra.contents() {
1007            self.record_statement_lifecycle_event(
1008                &id,
1009                &StatementLifecycleEvent::OptimizationFinished,
1010                optimization_finished_at,
1011            );
1012        }
1013
1014        let sink_id = global_lir_plan.sink_id();
1015        let cluster_id = optimizer.cluster_id();
1016
1017        let (df_desc, df_meta) = global_lir_plan.unapply();
1018
1019        self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
1020
1021        // Callback for the active copy to.
1022        let (tx, rx) = oneshot::channel();
1023        let active_copy_to = ActiveCopyTo {
1024            conn_id: ctx.session().conn_id().clone(),
1025            tx,
1026            cluster_id,
1027            depends_on: source_ids,
1028        };
1029        // Add metadata for the new COPY TO. CopyTo returns a `ready` future, so it is safe to drop.
1030        drop(
1031            self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
1032                .await,
1033        );
1034
1035        // Ship dataflow.
1036        self.ship_dataflow(df_desc, cluster_id, None).await;
1037
1038        let span = Span::current();
1039        Ok(StageResult::HandleRetire(mz_ore::task::spawn(
1040            || "peek copy to dataflow",
1041            async {
1042                let res = rx.await;
1043                match res {
1044                    Ok(res) => res,
1045                    Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
1046                }
1047            }
1048            .instrument(span),
1049        )))
1050    }
1051
1052    #[instrument]
1053    async fn peek_explain_plan(
1054        &self,
1055        session: &Session,
1056        PeekStageExplainPlan {
1057            optimizer,
1058            insights_ctx,
1059            df_meta,
1060            explain_ctx:
1061                ExplainPlanContext {
1062                    config,
1063                    format,
1064                    stage,
1065                    desc,
1066                    optimizer_trace,
1067                    ..
1068                },
1069            ..
1070        }: PeekStageExplainPlan,
1071    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1072        let desc = desc.expect("RelationDesc for SelectPlan in EXPLAIN mode");
1073
1074        let session_catalog = self.catalog().for_session(session);
1075        let expr_humanizer = {
1076            let transient_items = btreemap! {
1077                optimizer.select_id() => TransientItem::new(
1078                    Some(vec![GlobalId::Explain.to_string()]),
1079                    Some(desc.iter_names().map(|c| c.to_string()).collect()),
1080                )
1081            };
1082            ExprHumanizerExt::new(transient_items, &session_catalog)
1083        };
1084
1085        let finishing = if optimizer.finishing().is_trivial(desc.arity()) {
1086            None
1087        } else {
1088            Some(optimizer.finishing().clone())
1089        };
1090
1091        let target_cluster = self.catalog().get_cluster(optimizer.cluster_id());
1092        let features = optimizer.config().features.clone();
1093
1094        let rows = optimizer_trace
1095            .into_rows(
1096                format,
1097                &config,
1098                &features,
1099                &expr_humanizer,
1100                finishing,
1101                Some(target_cluster),
1102                df_meta,
1103                stage,
1104                plan::ExplaineeStatementKind::Select,
1105                insights_ctx,
1106            )
1107            .await?;
1108
1109        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
1110    }
1111
1112    #[instrument]
1113    async fn peek_explain_pushdown(
1114        &self,
1115        session: &Session,
1116        stage: PeekStageExplainPushdown,
1117    ) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
1118        let as_of = stage.determination.timestamp_context.antichain();
1119        let mz_now = stage
1120            .determination
1121            .timestamp_context
1122            .timestamp()
1123            .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1124            .unwrap_or_else(ResultSpec::value_all);
1125        let fut = self
1126            .render_explain_pushdown_prepare(session, as_of, mz_now, stage.imports)
1127            .await;
1128        let span = Span::current();
1129        Ok(StageResult::HandleRetire(mz_ore::task::spawn(
1130            || "peek explain pushdown",
1131            fut.instrument(span),
1132        )))
1133    }
1134
1135    /// Determines the query timestamp and acquires read holds on dependent sources
1136    /// if necessary.
1137    #[instrument]
1138    pub(super) fn sequence_peek_timestamp(
1139        &mut self,
1140        session: &mut Session,
1141        when: &QueryWhen,
1142        cluster_id: ClusterId,
1143        timeline_context: TimelineContext,
1144        oracle_read_ts: Option<Timestamp>,
1145        source_bundle: &CollectionIdBundle,
1146        source_ids: &BTreeSet<GlobalId>,
1147        real_time_recency_ts: Option<Timestamp>,
1148        requires_linearization: RequireLinearization,
1149    ) -> Result<TimestampDetermination<Timestamp>, AdapterError> {
1150        let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when);
1151        let timedomain_bundle;
1152
1153        // Fetch or generate a timestamp for this query and what the read holds would be if we need to set
1154        // them.
1155        let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
1156            // Use the transaction's timestamp if it exists and this isn't an AS OF query.
1157            Some(
1158                determination @ TimestampDetermination {
1159                    timestamp_context: TimestampContext::TimelineTimestamp { .. },
1160                    ..
1161                },
1162            ) if in_immediate_multi_stmt_txn => (determination, None),
1163            _ => {
1164                let determine_bundle = if in_immediate_multi_stmt_txn {
1165                    // In a transaction, determine a timestamp that will be valid for anything in
1166                    // any schema referenced by the first query.
1167                    timedomain_bundle = self.timedomain_for(
1168                        source_ids,
1169                        &timeline_context,
1170                        session.conn_id(),
1171                        cluster_id,
1172                    )?;
1173
1174                    &timedomain_bundle
1175                } else {
1176                    // If not in a transaction, use the source.
1177                    source_bundle
1178                };
1179                let (determination, read_holds) = self.determine_timestamp(
1180                    session,
1181                    determine_bundle,
1182                    when,
1183                    cluster_id,
1184                    &timeline_context,
1185                    oracle_read_ts,
1186                    real_time_recency_ts,
1187                )?;
1188                // We only need read holds if the read depends on a timestamp.
1189                let read_holds = match determination.timestamp_context.timestamp() {
1190                    Some(_ts) => Some(read_holds),
1191                    None => {
1192                        // We don't need the read holds and shouldn't add them
1193                        // to the txn.
1194                        //
1195                        // TODO: Handle this within determine_timestamp.
1196                        drop(read_holds);
1197                        None
1198                    }
1199                };
1200                (determination, read_holds)
1201            }
1202        };
1203
1204        // Always either verify the current statement ids are within the existing
1205        // transaction's read hold set (timedomain), or create the read holds if this is the
1206        // first statement in a transaction (or this is a single statement transaction).
1207        // This must happen even if this is an `AS OF` query as well. There are steps after
1208        // this that happen off thread, so no matter the kind of statement or transaction,
1209        // we must acquire read holds here so they are held until the off-thread work
1210        // returns to the coordinator.
1211
1212        if let Some(txn_reads) = self.txn_read_holds.get(session.conn_id()) {
1213            // Find referenced ids not in the read hold. A reference could be caused by a
1214            // user specifying an object in a different schema than the first query. An
1215            // index could be caused by a CREATE INDEX after the transaction started.
1216            let allowed_id_bundle = txn_reads.id_bundle();
1217
1218            // We don't need the read holds that determine_timestamp acquired
1219            // for us.
1220            drop(read_holds);
1221
1222            let outside = source_bundle.difference(&allowed_id_bundle);
1223            // Queries without a timestamp and timeline can belong to any existing timedomain.
1224            if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
1225                let valid_names =
1226                    self.resolve_collection_id_bundle_names(session, &allowed_id_bundle);
1227                let invalid_names = self.resolve_collection_id_bundle_names(session, &outside);
1228                return Err(AdapterError::RelationOutsideTimeDomain {
1229                    relations: invalid_names,
1230                    names: valid_names,
1231                });
1232            }
1233        } else if let Some(read_holds) = read_holds {
1234            self.store_transaction_read_holds(session, read_holds);
1235        }
1236
1237        // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
1238        // arbitrary and we don't recall why we did it (possibly an error!). Change this to always
1239        // set the transaction ops. Decide and document what our policy should be on AS OF queries.
1240        // Maybe they shouldn't be allowed in transactions at all because it's hard to explain
1241        // what's going on there. This should probably get a small design document.
1242
1243        // We only track the peeks in the session if the query doesn't use AS
1244        // OF or we're inside an explicit transaction. The latter case is
1245        // necessary to support PG's `BEGIN` semantics, whose behavior can
1246        // depend on whether or not reads have occurred in the txn.
1247        let mut transaction_determination = determination.clone();
1248        if when.is_transactional() {
1249            session.add_transaction_ops(TransactionOps::Peeks {
1250                determination: transaction_determination,
1251                cluster_id,
1252                requires_linearization,
1253            })?;
1254        } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
1255            // If the query uses AS OF, then ignore the timestamp.
1256            transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
1257            session.add_transaction_ops(TransactionOps::Peeks {
1258                determination: transaction_determination,
1259                cluster_id,
1260                requires_linearization,
1261            })?;
1262        };
1263
1264        Ok(determination)
1265    }
1266}