mz_adapter/
frontend_peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::sync::Arc;
13
14use itertools::Either;
15use mz_adapter_types::dyncfgs::CONSTRAINT_BASED_TIMESTAMP_SELECTION;
16use mz_adapter_types::timestamp_selection::ConstraintBasedTimestampSelection;
17use mz_compute_types::ComputeInstanceId;
18use mz_expr::{CollectionPlan, ResultSpec};
19use mz_ore::cast::{CastFrom, CastLossy};
20use mz_ore::now::EpochMillis;
21use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
22use mz_repr::role_id::RoleId;
23use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
24use mz_sql::catalog::CatalogCluster;
25use mz_sql::plan::{self, Plan, QueryWhen};
26use mz_sql::rbac;
27use mz_sql::session::metadata::SessionMetadata;
28use mz_sql::session::vars::IsolationLevel;
29use mz_sql_parser::ast::{CopyDirection, ExplainStage, Statement};
30use mz_transform::EmptyStatisticsOracle;
31use mz_transform::dataflow::DataflowMetainfo;
32use opentelemetry::trace::TraceContextExt;
33use tracing::{Span, debug};
34use tracing_opentelemetry::OpenTelemetrySpanExt;
35
36use crate::catalog::CatalogState;
37use crate::command::Command;
38use crate::coord::peek::PeekPlan;
39use crate::coord::sequencer::{eval_copy_to_uri, statistics_oracle};
40use crate::coord::timeline::timedomain_for;
41use crate::coord::timestamp_selection::TimestampDetermination;
42use crate::coord::{Coordinator, CopyToContext, ExplainContext, ExplainPlanContext, TargetCluster};
43use crate::explain::insights::PlanInsightsContext;
44use crate::explain::optimizer_trace::OptimizerTrace;
45use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
46use crate::optimize::{Optimize, OptimizerError};
47use crate::session::{Session, TransactionOps, TransactionStatus};
48use crate::{
49    AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, PeekClient, ReadHolds,
50    TimelineContext, TimestampContext, TimestampProvider, optimize,
51};
52use crate::{coord, metrics};
53
54impl PeekClient {
55    pub(crate) async fn try_frontend_peek_inner(
56        &mut self,
57        portal_name: &str,
58        session: &mut Session,
59    ) -> Result<Option<ExecuteResponse>, AdapterError> {
60        if session.vars().emit_timestamp_notice() {
61            // TODO(peek-seq): implement this. See end of peek_finish
62            debug!("Bailing out from try_frontend_peek_inner, because emit_timestamp_notice");
63            return Ok(None);
64        }
65
66        // # From handle_execute
67
68        if session.vars().emit_trace_id_notice() {
69            let span_context = tracing::Span::current()
70                .context()
71                .span()
72                .span_context()
73                .clone();
74            if span_context.is_valid() {
75                session.add_notice(AdapterNotice::QueryTrace {
76                    trace_id: span_context.trace_id(),
77                });
78            }
79        }
80
81        // TODO(peek-seq): This snapshot is wasted when we end up bailing out from the frontend peek
82        // sequencing. We could solve this is with that optimization where we
83        // continuously keep a catalog snapshot in the session, and only get a new one when the
84        // catalog revision has changed, which we could see with an atomic read.
85        // But anyhow, this problem will just go away when we reach the point that we never fall
86        // back to the old sequencing.
87        let catalog = self.catalog_snapshot("try_frontend_peek_inner").await;
88
89        if let Err(_) = Coordinator::verify_portal(&*catalog, session, portal_name) {
90            // TODO(peek-seq): Don't fall back to the coordinator's peek sequencing here, but retire already.
91            debug!(
92                "Bailing out from try_frontend_peek_inner, because verify_portal returned an error"
93            );
94            return Ok(None);
95        }
96
97        // TODO(peek-seq): statement logging (and then enable it in various tests)
98        let (stmt, params) = {
99            let portal = session
100                .get_portal_unverified(portal_name)
101                // The portal is a session-level thing, so it couldn't have concurrently disappeared
102                // since the above verification.
103                .expect("called verify_portal above");
104            let params = portal.parameters.clone();
105            let stmt = portal.stmt.clone();
106            (stmt, params)
107        };
108
109        let stmt = match stmt {
110            Some(stmt) => stmt,
111            None => {
112                debug!("try_frontend_peek_inner succeeded on an empty query");
113                return Ok(Some(ExecuteResponse::EmptyQuery));
114            }
115        };
116
117        // Before planning, check if this is a statement type we can handle.
118        match &*stmt {
119            Statement::Select(_)
120            | Statement::ExplainAnalyzeObject(_)
121            | Statement::ExplainAnalyzeCluster(_) => {
122                // These are always fine, just continue.
123                // Note: EXPLAIN ANALYZE will `plan` to `Plan::Select`.
124            }
125            Statement::ExplainPlan(explain_stmt) => {
126                // Only handle ExplainPlan for SELECT statements.
127                // We don't want to handle e.g. EXPLAIN CREATE MATERIALIZED VIEW here, because that
128                // requires purification before planning, which the frontend peek sequencing doesn't
129                // do.
130                match &explain_stmt.explainee {
131                    mz_sql_parser::ast::Explainee::Select(..) => {
132                        // This is a SELECT, continue
133                    }
134                    _ => {
135                        debug!(
136                            "Bailing out from try_frontend_peek_inner, because EXPLAIN is not for a SELECT query"
137                        );
138                        return Ok(None);
139                    }
140                }
141            }
142            Statement::ExplainPushdown(explain_stmt) => {
143                // Only handle EXPLAIN FILTER PUSHDOWN for SELECT statements
144                match &explain_stmt.explainee {
145                    mz_sql_parser::ast::Explainee::Select(..) => {}
146                    _ => {
147                        debug!(
148                            "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query"
149                        );
150                        return Ok(None);
151                    }
152                }
153            }
154            Statement::Copy(copy_stmt) => {
155                match &copy_stmt.direction {
156                    CopyDirection::To => {
157                        // This is COPY TO, continue
158                    }
159                    CopyDirection::From => {
160                        debug!(
161                            "Bailing out from try_frontend_peek_inner, because COPY FROM is not supported"
162                        );
163                        return Ok(None);
164                    }
165                }
166            }
167            _ => {
168                debug!(
169                    "Bailing out from try_frontend_peek_inner, because statement type is not supported"
170                );
171                return Ok(None);
172            }
173        }
174
175        let session_type = metrics::session_type_label_value(session.user());
176        let stmt_type = metrics::statement_type_label_value(&stmt);
177
178        // # From handle_execute_inner
179
180        let conn_catalog = catalog.for_session(session);
181        // (`resolved_ids` should be derivable from `stmt`. If `stmt` is later transformed to
182        // remove/add IDs, then `resolved_ids` should be updated to also remove/add those IDs.)
183        let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?;
184
185        let pcx = session.pcx();
186        let plan = mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, &params, &resolved_ids)?;
187        let (select_plan, explain_ctx, copy_to_ctx) = match &plan {
188            Plan::Select(select_plan) => {
189                let explain_ctx = if session.vars().emit_plan_insights_notice() {
190                    let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
191                    ExplainContext::PlanInsightsNotice(optimizer_trace)
192                } else {
193                    ExplainContext::None
194                };
195                (select_plan, explain_ctx, None)
196            }
197            Plan::ExplainPlan(plan::ExplainPlanPlan {
198                stage,
199                format,
200                config,
201                explainee:
202                    plan::Explainee::Statement(plan::ExplaineeStatement::Select { broken, plan, desc }),
203            }) => {
204                // Create OptimizerTrace to collect optimizer plans
205                let optimizer_trace = OptimizerTrace::new(stage.paths());
206                let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
207                    broken: *broken,
208                    config: config.clone(),
209                    format: *format,
210                    stage: *stage,
211                    replan: None,
212                    desc: Some(desc.clone()),
213                    optimizer_trace,
214                });
215                (plan, explain_ctx, None)
216            }
217            // COPY TO S3
218            Plan::CopyTo(plan::CopyToPlan {
219                select_plan,
220                desc,
221                to,
222                connection,
223                connection_id,
224                format,
225                max_file_size,
226            }) => {
227                let uri = eval_copy_to_uri(to.clone(), session, catalog.state())?;
228
229                // (output_batch_count will be set later)
230                let copy_to_ctx = CopyToContext {
231                    desc: desc.clone(),
232                    uri,
233                    connection: connection.clone(),
234                    connection_id: *connection_id,
235                    format: format.clone(),
236                    max_file_size: *max_file_size,
237                    output_batch_count: None,
238                };
239
240                (select_plan, ExplainContext::None, Some(copy_to_ctx))
241            }
242            Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => {
243                // Only handle EXPLAIN FILTER PUSHDOWN for SELECT statements
244                match explainee {
245                    plan::Explainee::Statement(plan::ExplaineeStatement::Select {
246                        broken: false,
247                        plan,
248                        desc: _,
249                    }) => {
250                        let explain_ctx = ExplainContext::Pushdown;
251                        (plan, explain_ctx, None)
252                    }
253                    _ => {
254                        debug!(
255                            "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is EXPLAIN BROKEN"
256                        );
257                        return Ok(None);
258                    }
259                }
260            }
261            _ => {
262                debug!(
263                    "Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO"
264                );
265                return Ok(None);
266            }
267        };
268
269        // # From sequence_plan
270
271        // We have checked the plan kind above.
272        assert!(plan.allowed_in_read_only());
273
274        let target_cluster = match session.transaction().cluster() {
275            // Use the current transaction's cluster.
276            Some(cluster_id) => TargetCluster::Transaction(cluster_id),
277            // If there isn't a current cluster set for a transaction, then try to auto route.
278            None => {
279                coord::catalog_serving::auto_run_on_catalog_server(&conn_catalog, session, &plan)
280            }
281        };
282        let (cluster, target_cluster_id, target_cluster_name) = {
283            let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
284            (cluster, cluster.id, &cluster.name)
285        };
286
287        // TODO(peek-seq): statement logging: set_statement_execution_cluster
288
289        coord::catalog_serving::check_cluster_restrictions(
290            target_cluster_name.as_str(),
291            &conn_catalog,
292            &plan,
293        )?;
294
295        rbac::check_plan(
296            &conn_catalog,
297            None::<fn(u32) -> Option<RoleId>>,
298            session,
299            &plan,
300            Some(target_cluster_id),
301            &resolved_ids,
302        )?;
303
304        // Check if we're still waiting for any of the builtin table appends from when we
305        // started the Session to complete.
306        //
307        // (This is done slightly earlier in the normal peek sequencing, but we have to be past the
308        // last use of `conn_catalog` here.)
309        if let Some(_) = coord::appends::waiting_on_startup_appends(&*catalog, session, &plan) {
310            // TODO(peek-seq): Don't fall back to the coordinator's peek sequencing here, but call
311            // `defer_op`. Needs `ExecuteContext`.
312            // This fallback is currently causing a bug: `waiting_on_startup_appends` has the
313            // side effect that it already clears the wait flag, and therefore the old peek
314            // sequencing that we fall back to here won't do waiting. This is tested by
315            // `test_mz_sessions` and `test_pg_cancel_dropped_role`, where I've disabled the
316            // frontend peek sequencing for now. This bug will just go away once we don't fall back
317            // to the old peek sequencing here, but properly call `defer_op` instead.
318            debug!("Bailing out from try_frontend_peek_inner, because waiting_on_startup_appends");
319            return Ok(None);
320        }
321
322        let max_query_result_size = Some(session.vars().max_query_result_size());
323
324        // # From sequence_peek
325
326        // # From peek_validate
327
328        let compute_instance_snapshot =
329            ComputeInstanceSnapshot::new_without_collections(cluster.id());
330
331        let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config())
332            .override_from(&catalog.get_cluster(cluster.id()).config.features())
333            .override_from(&explain_ctx);
334
335        if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
336            return Err(AdapterError::NoClusterReplicasAvailable {
337                name: cluster.name.clone(),
338                is_managed: cluster.is_managed(),
339            });
340        }
341
342        let (_, view_id) = self.transient_id_gen.allocate_id();
343        let (_, index_id) = self.transient_id_gen.allocate_id();
344
345        let mut optimizer = if let Some(mut copy_to_ctx) = copy_to_ctx {
346            // COPY TO path: calculate output_batch_count and create copy_to optimizer
347            let worker_counts = cluster.replicas().map(|r| {
348                let loc = &r.config.location;
349                loc.workers().unwrap_or_else(|| loc.num_processes())
350            });
351            let max_worker_count = match worker_counts.max() {
352                Some(count) => u64::cast_from(count),
353                None => {
354                    return Err(AdapterError::NoClusterReplicasAvailable {
355                        name: cluster.name.clone(),
356                        is_managed: cluster.is_managed(),
357                    });
358                }
359            };
360            copy_to_ctx.output_batch_count = Some(max_worker_count);
361
362            Either::Right(optimize::copy_to::Optimizer::new(
363                Arc::clone(&catalog),
364                compute_instance_snapshot.clone(),
365                view_id,
366                copy_to_ctx,
367                optimizer_config,
368                self.optimizer_metrics.clone(),
369            ))
370        } else {
371            // SELECT/EXPLAIN path: create peek optimizer
372            Either::Left(optimize::peek::Optimizer::new(
373                Arc::clone(&catalog),
374                compute_instance_snapshot.clone(),
375                select_plan.finishing.clone(),
376                view_id,
377                index_id,
378                optimizer_config,
379                self.optimizer_metrics.clone(),
380            ))
381        };
382
383        let target_replica_name = session.vars().cluster_replica();
384        let mut target_replica = target_replica_name
385            .map(|name| {
386                cluster
387                    .replica_id(name)
388                    .ok_or(AdapterError::UnknownClusterReplica {
389                        cluster_name: cluster.name.clone(),
390                        replica_name: name.to_string(),
391                    })
392            })
393            .transpose()?;
394
395        let source_ids = select_plan.source.depends_on();
396        // TODO(peek-seq): validate_timeline_context can be expensive in real scenarios (not in
397        // simple benchmarks), because it traverses transitive dependencies even of indexed views and
398        // materialized views (also traversing their MIR plans).
399        let mut timeline_context = catalog.validate_timeline_context(source_ids.iter().copied())?;
400        if matches!(timeline_context, TimelineContext::TimestampIndependent)
401            && select_plan.source.contains_temporal()?
402        {
403            // If the source IDs are timestamp independent but the query contains temporal functions,
404            // then the timeline context needs to be upgraded to timestamp dependent. This is
405            // required because `source_ids` doesn't contain functions.
406            timeline_context = TimelineContext::TimestampDependent;
407        }
408
409        let notices = coord::sequencer::check_log_reads(
410            &catalog,
411            cluster,
412            &source_ids,
413            &mut target_replica,
414            session.vars(),
415        )?;
416        session.add_notices(notices);
417
418        // # From peek_linearize_timestamp
419
420        let isolation_level = session.vars().transaction_isolation().clone();
421        let timeline = Coordinator::get_timeline(&timeline_context);
422        let needs_linearized_read_ts =
423            Coordinator::needs_linearized_read_ts(&isolation_level, &select_plan.when);
424
425        let oracle_read_ts = match timeline {
426            Some(timeline) if needs_linearized_read_ts => {
427                let oracle = self.ensure_oracle(timeline).await?;
428                let oracle_read_ts = oracle.read_ts().await;
429                Some(oracle_read_ts)
430            }
431            Some(_) | None => None,
432        };
433
434        // # From peek_real_time_recency
435
436        let vars = session.vars();
437        let real_time_recency_ts: Option<Timestamp> = if vars.real_time_recency()
438            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
439            && !session.contains_read_timestamp()
440        {
441            // Only call the coordinator when we actually need real-time recency
442            self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp {
443                source_ids: source_ids.clone(),
444                real_time_recency_timeout: *vars.real_time_recency_timeout(),
445                tx,
446            })
447            .await?
448        } else {
449            None
450        };
451
452        // # From peek_timestamp_read_hold
453
454        let dataflow_builder = DataflowBuilder::new(catalog.state(), compute_instance_snapshot);
455        let input_id_bundle = dataflow_builder.sufficient_collections(source_ids.clone());
456
457        // ## From sequence_peek_timestamp
458
459        // Warning: This will be false for AS OF queries, even if we are otherwise inside a
460        // multi-statement transaction. (It's also false for FreshestTableWrite, which is currently
461        // only read-then-write queries, which can't be part of multi-statement transactions, so
462        // FreshestTableWrite doesn't matter.)
463        //
464        // TODO(peek-seq): It's not totally clear to me what the intended semantics are for AS OF
465        // queries inside a transaction: We clearly can't use the transaction timestamp, but the old
466        // peek sequencing still does a timedomain validation. The new peek sequencing does not do
467        // timedomain validation for AS OF queries, which seems more natural. But I'm thinking that
468        // it would be the cleanest to just simply disallow AS OF queries inside transactions.
469        let in_immediate_multi_stmt_txn = session
470            .transaction()
471            .in_immediate_multi_stmt_txn(&select_plan.when);
472
473        // Fetch or generate a timestamp for this query and fetch or acquire read holds.
474        let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
475            // Use the transaction's timestamp if it exists and this isn't an AS OF query.
476            // (`in_immediate_multi_stmt_txn` is false for AS OF queries.)
477            Some(
478                determination @ TimestampDetermination {
479                    timestamp_context: TimestampContext::TimelineTimestamp { .. },
480                    ..
481                },
482            ) if in_immediate_multi_stmt_txn => {
483                // This is a subsequent (non-AS OF, non-constant) query in a multi-statement
484                // transaction. We now:
485                // - Validate that the query only accesses collections within the transaction's
486                //   timedomain (which we know from the stored read holds).
487                // - Use the transaction's stored timestamp determination.
488                // - Use the (relevant subset of the) transaction's read holds.
489
490                let txn_read_holds_opt = self
491                    .call_coordinator(|tx| Command::GetTransactionReadHoldsBundle {
492                        conn_id: session.conn_id().clone(),
493                        tx,
494                    })
495                    .await;
496
497                if let Some(txn_read_holds) = txn_read_holds_opt {
498                    let allowed_id_bundle = txn_read_holds.id_bundle();
499                    let outside = input_id_bundle.difference(&allowed_id_bundle);
500
501                    // Queries without a timestamp and timeline can belong to any existing timedomain.
502                    if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
503                        let valid_names =
504                            allowed_id_bundle.resolve_names(&*catalog, session.conn_id());
505                        let invalid_names = outside.resolve_names(&*catalog, session.conn_id());
506                        return Err(AdapterError::RelationOutsideTimeDomain {
507                            relations: invalid_names,
508                            names: valid_names,
509                        });
510                    }
511
512                    // Extract the subset of read holds for the collections this query accesses.
513                    let read_holds = txn_read_holds.subset(&input_id_bundle);
514
515                    (determination, read_holds)
516                } else {
517                    // This should never happen: we're in a subsequent query of a multi-statement
518                    // transaction (we have a transaction timestamp), but the coordinator has no
519                    // transaction read holds stored. This indicates a bug in the transaction
520                    // handling.
521                    return Err(AdapterError::Internal(
522                        "Missing transaction read holds for multi-statement transaction"
523                            .to_string(),
524                    ));
525                }
526            }
527            _ => {
528                // There is no timestamp determination yet for this transaction. Either:
529                // - We are not in a multi-statement transaction.
530                // - This is the first (non-AS OF) query in a multi-statement transaction.
531                // - This is an AS OF query.
532                // - This is a constant query (`TimestampContext::NoTimestamp`).
533
534                let timedomain_bundle;
535                let determine_bundle = if in_immediate_multi_stmt_txn {
536                    // This is the first (non-AS OF) query in a multi-statement transaction.
537                    // Determine a timestamp that will be valid for anything in any schema
538                    // referenced by the first query.
539                    timedomain_bundle = timedomain_for(
540                        &*catalog,
541                        &dataflow_builder,
542                        &source_ids,
543                        &timeline_context,
544                        session.conn_id(),
545                        target_cluster_id,
546                    )?;
547                    &timedomain_bundle
548                } else {
549                    // Simply use the inputs of the current query.
550                    &input_id_bundle
551                };
552                let (determination, read_holds) = self
553                    .frontend_determine_timestamp(
554                        catalog.state(),
555                        session,
556                        determine_bundle,
557                        &select_plan.when,
558                        target_cluster_id,
559                        &timeline_context,
560                        oracle_read_ts,
561                        real_time_recency_ts,
562                    )
563                    .await?;
564
565                // If this is the first (non-AS OF) query in a multi-statement transaction, store
566                // the read holds in the coordinator, so subsequent queries can validate against
567                // them.
568                if in_immediate_multi_stmt_txn {
569                    self.call_coordinator(|tx| Command::StoreTransactionReadHolds {
570                        conn_id: session.conn_id().clone(),
571                        read_holds: read_holds.clone(),
572                        tx,
573                    })
574                    .await;
575                }
576
577                (determination, read_holds)
578            }
579        };
580
581        // (TODO(peek-seq): The below TODO is copied from the old peek sequencing. We should resolve
582        // this when we decide what to with `AS OF` in transactions.)
583        // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
584        // arbitrary and we don't recall why we did it (possibly an error!). Change this to always
585        // set the transaction ops. Decide and document what our policy should be on AS OF queries.
586        // Maybe they shouldn't be allowed in transactions at all because it's hard to explain
587        // what's going on there. This should probably get a small design document.
588
589        // We only track the peeks in the session if the query doesn't use AS
590        // OF or we're inside an explicit transaction. The latter case is
591        // necessary to support PG's `BEGIN` semantics, whose behavior can
592        // depend on whether or not reads have occurred in the txn.
593        let requires_linearization = (&explain_ctx).into();
594        let mut transaction_determination = determination.clone();
595        if select_plan.when.is_transactional() {
596            session.add_transaction_ops(TransactionOps::Peeks {
597                determination: transaction_determination,
598                cluster_id: target_cluster_id,
599                requires_linearization,
600            })?;
601        } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
602            // If the query uses AS OF, then ignore the timestamp.
603            transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
604            session.add_transaction_ops(TransactionOps::Peeks {
605                determination: transaction_determination,
606                cluster_id: target_cluster_id,
607                requires_linearization,
608            })?;
609        };
610
611        // # From peek_optimize
612
613        let stats = statistics_oracle(
614            session,
615            &source_ids,
616            &determination.timestamp_context.antichain(),
617            true,
618            catalog.system_config(),
619            &*self.storage_collections,
620        )
621        .await
622        .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
623
624        // Generate data structures that can be moved to another task where we will perform possibly
625        // expensive optimizations.
626        let timestamp_context = determination.timestamp_context.clone();
627        let session_meta = session.meta();
628        let now = catalog.config().now.clone();
629        let select_plan = select_plan.clone();
630        let target_cluster_name = target_cluster_name.clone();
631        let needs_plan_insights = explain_ctx.needs_plan_insights();
632        let determination_for_pushdown = if matches!(explain_ctx, ExplainContext::Pushdown) {
633            // This is a hairy data structure, so avoid this clone if we are not in
634            // EXPLAIN FILTER PUSHDOWN.
635            Some(determination.clone())
636        } else {
637            None
638        };
639
640        let span = Span::current();
641
642        // Prepare data for plan insights if needed
643        let catalog_for_insights = if needs_plan_insights {
644            Some(Arc::clone(&catalog))
645        } else {
646            None
647        };
648        let mut compute_instances = BTreeMap::new();
649        if needs_plan_insights {
650            for user_cluster in catalog.user_clusters() {
651                let snapshot = ComputeInstanceSnapshot::new_without_collections(user_cluster.id);
652                compute_instances.insert(user_cluster.name.clone(), snapshot);
653            }
654        }
655
656        // Enum for branching among various execution steps after optimization
657        enum Execution {
658            Peek {
659                global_lir_plan: optimize::peek::GlobalLirPlan,
660                optimization_finished_at: EpochMillis,
661                plan_insights_optimizer_trace: Option<OptimizerTrace>,
662                insights_ctx: Option<Box<PlanInsightsContext>>,
663            },
664            CopyToS3 {
665                global_lir_plan: optimize::copy_to::GlobalLirPlan,
666                source_ids: BTreeSet<GlobalId>,
667            },
668            ExplainPlan {
669                df_meta: DataflowMetainfo,
670                explain_ctx: ExplainPlanContext,
671                optimizer: optimize::peek::Optimizer,
672                insights_ctx: Option<Box<PlanInsightsContext>>,
673            },
674            ExplainPushdown {
675                imports: BTreeMap<GlobalId, mz_expr::MapFilterProject>,
676                determination: TimestampDetermination<Timestamp>,
677            },
678        }
679
680        let source_ids_for_closure = source_ids.clone();
681        let optimization_result = mz_ore::task::spawn_blocking(
682            || "optimize peek",
683            move || {
684                span.in_scope(|| {
685                    let _dispatch_guard = explain_ctx.dispatch_guard();
686
687                    let raw_expr = select_plan.source.clone();
688
689                    // The purpose of wrapping the following in a closure is to control where the
690                    // `?`s return from, so that even when a `catch_unwind_optimize` call fails,
691                    // we can still handle `EXPLAIN BROKEN`.
692                    let pipeline = || -> Result<Either<optimize::peek::GlobalLirPlan, optimize::copy_to::GlobalLirPlan>, OptimizerError> {
693                        match optimizer.as_mut() {
694                            Either::Left(optimizer) => {
695                                // SELECT/EXPLAIN path
696                                // HIR ⇒ MIR lowering and MIR optimization (local)
697                                let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr.clone())?;
698                                // Attach resolved context required to continue the pipeline.
699                                let local_mir_plan =
700                                    local_mir_plan.resolve(timestamp_context.clone(), &session_meta, stats);
701                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
702                                let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
703                                Ok(Either::Left(global_lir_plan))
704                            }
705                            Either::Right(optimizer) => {
706                                // COPY TO path
707                                // HIR ⇒ MIR lowering and MIR optimization (local)
708                                let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr.clone())?;
709                                // Attach resolved context required to continue the pipeline.
710                                let local_mir_plan =
711                                    local_mir_plan.resolve(timestamp_context.clone(), &session_meta, stats);
712                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
713                                let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
714                                Ok(Either::Right(global_lir_plan))
715                            }
716                        }
717                    };
718
719                    let global_lir_plan_result = pipeline();
720                    let optimization_finished_at = now();
721
722                    let create_insights_ctx = |optimizer: &optimize::peek::Optimizer, is_notice: bool| -> Option<Box<PlanInsightsContext>> {
723                        if !needs_plan_insights {
724                            return None;
725                        }
726
727                        let catalog = catalog_for_insights.as_ref()?;
728
729                        let enable_re_optimize = if needs_plan_insights {
730                            // Disable any plan insights that use the optimizer if we only want the
731                            // notice and plan optimization took longer than the threshold. This is
732                            // to prevent a situation where optimizing takes a while and there are
733                            // lots of clusters, which would delay peek execution by the product of
734                            // those.
735                            //
736                            // (This heuristic doesn't work well, see #9492.)
737                            let opt_limit = mz_adapter_types::dyncfgs::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
738                                .get(catalog.system_config().dyncfgs());
739                            !(is_notice && optimizer.duration() > opt_limit)
740                        } else {
741                            false
742                        };
743
744                        Some(Box::new(PlanInsightsContext {
745                            stmt: select_plan.select.as_deref().map(Clone::clone).map(Statement::Select),
746                            raw_expr: raw_expr.clone(),
747                            catalog: Arc::clone(catalog),
748                            compute_instances,
749                            target_instance: target_cluster_name,
750                            metrics: optimizer.metrics().clone(),
751                            finishing: optimizer.finishing().clone(),
752                            optimizer_config: optimizer.config().clone(),
753                            session: session_meta,
754                            timestamp_context,
755                            view_id: optimizer.select_id(),
756                            index_id: optimizer.index_id(),
757                            enable_re_optimize,
758                        }))
759                    };
760
761                    match global_lir_plan_result {
762                        Ok(Either::Left(global_lir_plan)) => {
763                            // SELECT/EXPLAIN path
764                            let optimizer = optimizer.unwrap_left();
765                            match explain_ctx {
766                                ExplainContext::Plan(explain_ctx) => {
767                                    let (_, df_meta, _) = global_lir_plan.unapply();
768                                    let insights_ctx = create_insights_ctx(&optimizer, false);
769                                    Ok(Execution::ExplainPlan {
770                                        df_meta,
771                                        explain_ctx,
772                                        optimizer,
773                                        insights_ctx,
774                                    })
775                                }
776                                ExplainContext::None => {
777                                    Ok(Execution::Peek {
778                                        global_lir_plan,
779                                        optimization_finished_at,
780                                        plan_insights_optimizer_trace: None,
781                                        insights_ctx: None,
782                                    })
783                                }
784                                ExplainContext::PlanInsightsNotice(optimizer_trace) => {
785                                    let insights_ctx = create_insights_ctx(&optimizer, true);
786                                    Ok(Execution::Peek {
787                                        global_lir_plan,
788                                        optimization_finished_at,
789                                        plan_insights_optimizer_trace: Some(optimizer_trace),
790                                        insights_ctx,
791                                    })
792                                }
793                                ExplainContext::Pushdown => {
794                                    let (plan, _, _) = global_lir_plan.unapply();
795                                    let imports = match plan {
796                                        PeekPlan::SlowPath(plan) => plan
797                                            .desc
798                                            .source_imports
799                                            .into_iter()
800                                            .filter_map(|(id, (desc, _, _upper))| {
801                                                desc.arguments.operators.map(|mfp| (id, mfp))
802                                            })
803                                            .collect(),
804                                        PeekPlan::FastPath(_) => std::collections::BTreeMap::default(),
805                                    };
806                                    Ok(Execution::ExplainPushdown {
807                                        imports,
808                                        determination: determination_for_pushdown.expect("it's present for the ExplainPushdown case"),
809                                    })
810                                }
811                            }
812                        }
813                        Ok(Either::Right(global_lir_plan)) => {
814                            // COPY TO S3 path
815                            Ok(Execution::CopyToS3 {
816                                global_lir_plan,
817                                source_ids: source_ids_for_closure,
818                            })
819                        }
820                        Err(err) => {
821                            if optimizer.is_right() {
822                                // COPY TO has no EXPLAIN BROKEN support
823                                return Err(err);
824                            }
825                            // SELECT/EXPLAIN error handling
826                            let optimizer = optimizer.expect_left("checked above");
827                            if let ExplainContext::Plan(explain_ctx) = explain_ctx {
828                                if explain_ctx.broken {
829                                    // EXPLAIN BROKEN: log error and continue with defaults
830                                    tracing::error!("error while handling EXPLAIN statement: {}", err);
831                                    Ok(Execution::ExplainPlan {
832                                        df_meta: Default::default(),
833                                        explain_ctx,
834                                        optimizer,
835                                        insights_ctx: None,
836                                    })
837                                } else {
838                                    Err(err)
839                                }
840                            } else {
841                                Err(err)
842                            }
843                        }
844                    }
845                })
846            },
847        )
848        .await
849        .map_err(|optimizer_error| AdapterError::Internal(format!("internal error in optimizer: {}", optimizer_error)))?;
850
851        // Handle the optimization result: either generate EXPLAIN output or continue with execution
852        match optimization_result {
853            Execution::ExplainPlan {
854                df_meta,
855                explain_ctx,
856                optimizer,
857                insights_ctx,
858            } => {
859                let rows = coord::sequencer::explain_plan_inner(
860                    session,
861                    &catalog,
862                    df_meta,
863                    explain_ctx,
864                    optimizer,
865                    insights_ctx,
866                )
867                .await?;
868
869                Ok(Some(ExecuteResponse::SendingRowsImmediate {
870                    rows: Box::new(rows.into_row_iter()),
871                }))
872            }
873            Execution::ExplainPushdown {
874                imports,
875                determination,
876            } => {
877                // # From peek_explain_pushdown
878
879                let as_of = determination.timestamp_context.antichain();
880                let mz_now = determination
881                    .timestamp_context
882                    .timestamp()
883                    .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
884                    .unwrap_or_else(ResultSpec::value_all);
885
886                Ok(Some(
887                    coord::sequencer::explain_pushdown_future_inner(
888                        session,
889                        &*catalog,
890                        &self.storage_collections,
891                        as_of,
892                        mz_now,
893                        imports,
894                    )
895                    .await
896                    .await?,
897                ))
898            }
899            Execution::Peek {
900                global_lir_plan,
901                optimization_finished_at: _optimization_finished_at,
902                plan_insights_optimizer_trace,
903                insights_ctx,
904            } => {
905                // Continue with normal execution
906                // # From peek_finish
907
908                // TODO(peek-seq): statement logging
909
910                let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
911
912                // Warning: Do not bail out from the new peek sequencing after this point, because the
913                // following has side effects. TODO(peek-seq): remove this comment once we never
914                // bail out to the old sequencing.
915
916                coord::sequencer::emit_optimizer_notices(
917                    &*catalog,
918                    session,
919                    &df_meta.optimizer_notices,
920                );
921
922                // Generate plan insights notice if needed
923                if let Some(trace) = plan_insights_optimizer_trace {
924                    let target_cluster = catalog.get_cluster(target_cluster_id);
925                    let features = OptimizerFeatures::from(catalog.system_config())
926                        .override_from(&target_cluster.config.features());
927                    let insights = trace
928                        .into_plan_insights(
929                            &features,
930                            &catalog.for_session(session),
931                            Some(select_plan.finishing.clone()),
932                            Some(target_cluster),
933                            df_meta.clone(),
934                            insights_ctx,
935                        )
936                        .await?;
937                    session.add_notice(AdapterNotice::PlanInsights(insights));
938                }
939
940                // TODO(peek-seq): move this up to the beginning of the function when we have eliminated all
941                // the fallbacks to the old peek sequencing. Currently, it has to be here to avoid
942                // double-counting a fallback situation, but this has the drawback that if we error out
943                // from this function then we don't count the peek at all.
944                session
945                    .metrics()
946                    .query_total(&[session_type, stmt_type])
947                    .inc();
948
949                // # Now back to peek_finish
950
951                // TODO(peek-seq): statement logging
952
953                let max_result_size = catalog.system_config().max_result_size();
954
955                let response = match peek_plan {
956                    PeekPlan::FastPath(fast_path_plan) => {
957                        let row_set_finishing_seconds =
958                            session.metrics().row_set_finishing_seconds().clone();
959
960                        let peek_stash_read_batch_size_bytes =
961                            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
962                                .get(catalog.system_config().dyncfgs());
963                        let peek_stash_read_memory_budget_bytes =
964                            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
965                                .get(catalog.system_config().dyncfgs());
966
967                        self.implement_fast_path_peek_plan(
968                            fast_path_plan,
969                            determination.timestamp_context.timestamp_or_default(),
970                            select_plan.finishing,
971                            target_cluster_id,
972                            target_replica,
973                            typ,
974                            max_result_size,
975                            max_query_result_size,
976                            row_set_finishing_seconds,
977                            read_holds,
978                            peek_stash_read_batch_size_bytes,
979                            peek_stash_read_memory_budget_bytes,
980                        )
981                        .await?
982                    }
983                    PeekPlan::SlowPath(dataflow_plan) => {
984                        self.call_coordinator(|tx| Command::ExecuteSlowPathPeek {
985                            dataflow_plan: Box::new(dataflow_plan),
986                            determination,
987                            finishing: select_plan.finishing,
988                            compute_instance: target_cluster_id,
989                            target_replica,
990                            intermediate_result_type: typ,
991                            source_ids,
992                            conn_id: session.conn_id().clone(),
993                            max_result_size,
994                            max_query_result_size,
995                            tx,
996                        })
997                        .await?
998                    }
999                };
1000
1001                Ok(Some(match select_plan.copy_to {
1002                    None => response,
1003                    // COPY TO STDOUT
1004                    Some(format) => ExecuteResponse::CopyTo {
1005                        format,
1006                        resp: Box::new(response),
1007                    },
1008                }))
1009            }
1010            Execution::CopyToS3 {
1011                global_lir_plan,
1012                source_ids,
1013            } => {
1014                let (df_desc, df_meta) = global_lir_plan.unapply();
1015
1016                coord::sequencer::emit_optimizer_notices(
1017                    &*catalog,
1018                    session,
1019                    &df_meta.optimizer_notices,
1020                );
1021
1022                let response = self
1023                    .call_coordinator(|tx| Command::ExecuteCopyTo {
1024                        df_desc: Box::new(df_desc),
1025                        compute_instance: target_cluster_id,
1026                        target_replica,
1027                        source_ids,
1028                        conn_id: session.conn_id().clone(),
1029                        tx,
1030                    })
1031                    .await?;
1032
1033                Ok(Some(response))
1034            }
1035        }
1036    }
1037
1038    /// (Similar to Coordinator::determine_timestamp)
1039    /// Determines the timestamp for a query, acquires read holds that ensure the
1040    /// query remains executable at that time, and returns those.
1041    /// The caller is responsible for eventually dropping those read holds.
1042    ///
1043    /// Note: self is taken &mut because of the lazy fetching in `get_compute_instance_client`.
1044    pub(crate) async fn frontend_determine_timestamp(
1045        &mut self,
1046        catalog_state: &CatalogState,
1047        session: &Session,
1048        id_bundle: &CollectionIdBundle,
1049        when: &QueryWhen,
1050        compute_instance: ComputeInstanceId,
1051        timeline_context: &TimelineContext,
1052        oracle_read_ts: Option<Timestamp>,
1053        real_time_recency_ts: Option<Timestamp>,
1054    ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
1055        // this is copy-pasted from Coordinator
1056
1057        let constraint_based = ConstraintBasedTimestampSelection::from_str(
1058            &CONSTRAINT_BASED_TIMESTAMP_SELECTION.get(catalog_state.system_config().dyncfgs()),
1059        );
1060
1061        let isolation_level = session.vars().transaction_isolation();
1062
1063        let (read_holds, upper) = self
1064            .acquire_read_holds_and_least_valid_write(id_bundle)
1065            .await
1066            .map_err(|err| {
1067                AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1068                    err,
1069                    compute_instance,
1070                )
1071            })?;
1072        let (det, read_holds) = <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1073            session,
1074            id_bundle,
1075            when,
1076            compute_instance,
1077            timeline_context,
1078            oracle_read_ts,
1079            real_time_recency_ts,
1080            isolation_level,
1081            &constraint_based,
1082            read_holds,
1083            upper.clone(),
1084        )?;
1085
1086        session
1087            .metrics()
1088            .determine_timestamp(&[
1089                match det.respond_immediately() {
1090                    true => "true",
1091                    false => "false",
1092                },
1093                isolation_level.as_str(),
1094                &compute_instance.to_string(),
1095                constraint_based.as_str(),
1096            ])
1097            .inc();
1098        if !det.respond_immediately()
1099            && isolation_level == &IsolationLevel::StrictSerializable
1100            && real_time_recency_ts.is_none()
1101        {
1102            // Note down the difference between StrictSerializable and Serializable into a metric.
1103            if let Some(strict) = det.timestamp_context.timestamp() {
1104                let (serializable_det, _tmp_read_holds) =
1105                    <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1106                        session,
1107                        id_bundle,
1108                        when,
1109                        compute_instance,
1110                        timeline_context,
1111                        oracle_read_ts,
1112                        real_time_recency_ts,
1113                        isolation_level,
1114                        &constraint_based,
1115                        read_holds.clone(),
1116                        upper,
1117                    )?;
1118                if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
1119                    session
1120                        .metrics()
1121                        .timestamp_difference_for_strict_serializable_ms(&[
1122                            compute_instance.to_string().as_ref(),
1123                            constraint_based.as_str(),
1124                        ])
1125                        .observe(f64::cast_lossy(u64::from(
1126                            strict.saturating_sub(*serializable),
1127                        )));
1128                }
1129            }
1130        }
1131
1132        Ok((det, read_holds))
1133    }
1134}