Skip to main content

mz_adapter/
frontend_peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::sync::Arc;
13use std::time::Duration;
14
15use itertools::Itertools;
16use mz_adapter_types::dyncfgs::ENABLE_FRONTEND_SUBSCRIBES;
17use mz_compute_types::ComputeInstanceId;
18use mz_compute_types::dataflows::DataflowDescription;
19use mz_controller_types::ClusterId;
20use mz_expr::{CollectionPlan, ResultSpec, RowSetFinishing};
21use mz_ore::cast::{CastFrom, CastLossy};
22use mz_ore::collections::CollectionExt;
23use mz_ore::now::EpochMillis;
24use mz_ore::task::JoinHandle;
25use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
26use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
27use mz_repr::role_id::RoleId;
28use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
29use mz_sql::ast::Raw;
30use mz_sql::catalog::CatalogCluster;
31use mz_sql::plan::Params;
32use mz_sql::plan::{
33    self, Explainee, ExplaineeStatement, Plan, QueryWhen, SelectPlan, SubscribePlan,
34};
35use mz_sql::rbac;
36use mz_sql::session::metadata::SessionMetadata;
37use mz_sql::session::vars::IsolationLevel;
38use mz_sql_parser::ast::{CopyDirection, ExplainStage, ShowStatement, Statement};
39use mz_transform::EmptyStatisticsOracle;
40use mz_transform::dataflow::DataflowMetainfo;
41use opentelemetry::trace::TraceContextExt;
42use timely::progress::Antichain;
43use tracing::{Span, debug, warn};
44use tracing_opentelemetry::OpenTelemetrySpanExt;
45
46use crate::catalog::Catalog;
47use crate::command::Command;
48use crate::coord::peek::{FastPathPlan, PeekPlan};
49use crate::coord::sequencer::{eval_copy_to_uri, statistics_oracle};
50use crate::coord::timeline::timedomain_for;
51use crate::coord::timestamp_selection::TimestampDetermination;
52use crate::coord::{
53    Coordinator, CopyToContext, ExecuteContextGuard, ExplainContext, ExplainPlanContext,
54    TargetCluster,
55};
56use crate::explain::insights::PlanInsightsContext;
57use crate::explain::optimizer_trace::OptimizerTrace;
58use crate::optimize::Optimize;
59use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
60use crate::session::{Session, TransactionOps, TransactionStatus};
61use crate::statement_logging::WatchSetCreation;
62use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
63use crate::{
64    AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, PeekClient, ReadHolds,
65    TimelineContext, TimestampContext, TimestampProvider, optimize,
66};
67use crate::{coord, metrics};
68
69impl PeekClient {
70    /// Attempt to sequence a peek from the session task.
71    ///
72    /// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the
73    /// Coordinator's sequencing. If it returns an error, it should be returned to the user.
74    ///
75    /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
76    /// triggering the execution of the underlying query.
77    pub(crate) async fn try_frontend_peek(
78        &mut self,
79        portal_name: &str,
80        catalog: Option<Arc<Catalog>>,
81        session: &mut Session,
82        outer_ctx_extra: &mut Option<ExecuteContextGuard>,
83    ) -> Result<Option<ExecuteResponse>, AdapterError> {
84        // # From handle_execute
85
86        if session.vars().emit_trace_id_notice() {
87            let span_context = tracing::Span::current()
88                .context()
89                .span()
90                .span_context()
91                .clone();
92            if span_context.is_valid() {
93                session.add_notice(AdapterNotice::QueryTrace {
94                    trace_id: span_context.trace_id(),
95                });
96            }
97        }
98
99        // TODO(peek-seq): This snapshot is wasted when we end up bailing out from the frontend peek
100        // sequencing. We could solve this is with that optimization where we
101        // continuously keep a catalog snapshot in the session, and only get a new one when the
102        // catalog revision has changed, which we could see with an atomic read.
103        // But anyhow, this problem will just go away when we reach the point that we never fall
104        // back to the old sequencing.
105        let catalog = match catalog {
106            Some(c) => c,
107            None => self.catalog_snapshot("try_frontend_peek").await,
108        };
109
110        // Extract things from the portal.
111        let (stmt, params, logging, lifecycle_timestamps) = {
112            if let Err(err) = Coordinator::verify_portal(&*catalog, session, portal_name) {
113                outer_ctx_extra
114                    .take()
115                    .and_then(|guard| guard.defuse().retire());
116                return Err(err);
117            }
118            let portal = session
119                .get_portal_unverified(portal_name)
120                // The portal is a session-level thing, so it couldn't have concurrently disappeared
121                // since the above verification.
122                .expect("called verify_portal above");
123            let params = portal.parameters.clone();
124            let stmt = portal.stmt.clone();
125            let logging = Arc::clone(&portal.logging);
126            let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
127            (stmt, params, logging, lifecycle_timestamps)
128        };
129
130        // Before planning, check if this is a statement type we can handle.
131        // This must happen BEFORE statement logging setup to avoid orphaned execution records.
132        if let Some(ref stmt) = stmt {
133            match &**stmt {
134                Statement::Select(_)
135                | Statement::ExplainAnalyzeObject(_)
136                | Statement::ExplainAnalyzeCluster(_)
137                | Statement::Show(ShowStatement::ShowObjects(_))
138                | Statement::Show(ShowStatement::ShowColumns(_)) => {
139                    // These are always fine, just continue.
140                    // Note: EXPLAIN ANALYZE will `plan` to `Plan::Select`.
141                    // Note: ShowObjects plans to `Plan::Select`, ShowColumns plans to `Plan::ShowColumns`.
142                    // We handle `Plan::ShowColumns` specially in `try_frontend_peek_inner`.
143                }
144                Statement::ExplainPlan(explain_stmt) => {
145                    // Only handle ExplainPlan for SELECT statements.
146                    // We don't want to handle e.g. EXPLAIN CREATE MATERIALIZED VIEW here, because that
147                    // requires purification before planning, which the frontend peek sequencing doesn't
148                    // do.
149                    match &explain_stmt.explainee {
150                        mz_sql_parser::ast::Explainee::Select(..) => {
151                            // This is a SELECT, continue
152                        }
153                        _ => {
154                            debug!(
155                                "Bailing out from try_frontend_peek, because EXPLAIN is not for a SELECT query"
156                            );
157                            return Ok(None);
158                        }
159                    }
160                }
161                Statement::ExplainPushdown(explain_stmt) => {
162                    // Only handle EXPLAIN FILTER PUSHDOWN for non-BROKEN SELECT statements
163                    match &explain_stmt.explainee {
164                        mz_sql_parser::ast::Explainee::Select(_, false) => {}
165                        _ => {
166                            debug!(
167                                "Bailing out from try_frontend_peek, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is for EXPLAIN BROKEN"
168                            );
169                            return Ok(None);
170                        }
171                    }
172                }
173                Statement::Copy(copy_stmt) => {
174                    match &copy_stmt.direction {
175                        CopyDirection::To => {
176                            // This is COPY TO (...), continue
177                        }
178                        CopyDirection::From => {
179                            debug!(
180                                "Bailing out from try_frontend_peek, because COPY FROM is not supported"
181                            );
182                            return Ok(None);
183                        }
184                    }
185                }
186
187                Statement::Subscribe(_)
188                    if ENABLE_FRONTEND_SUBSCRIBES.get(catalog.system_config().dyncfgs()) =>
189                {
190                    // We have a subscribe statement to process; continue.
191                }
192                _ => {
193                    debug!(
194                        "Bailing out from try_frontend_peek, because statement type is not supported"
195                    );
196                    return Ok(None);
197                }
198            }
199        }
200
201        // Set up statement logging, and log the beginning of execution.
202        // (But only if we're not executing in the context of another statement.)
203        let logging_guard = self.begin_statement_logging(
204            session,
205            &params,
206            &logging,
207            &catalog,
208            lifecycle_timestamps,
209            outer_ctx_extra,
210        );
211        let statement_logging_id = logging_guard.id();
212        // Streaming peek/subscribe/slow-path/copy-to responses hand off the
213        // end-execution event to the coordinator (via
214        // `handle_peek_notification` / `cancel_pending_peeks`), so letting
215        // the RAII guard's `Drop` also emit `Aborted` on mid-flight drop
216        // would double-end and panic at `end_statement_execution`. Defuse
217        // and manage the lifecycle explicitly below: streaming arms skip
218        // emitting; non-streaming arms emit via `log_ended_execution`.
219        logging_guard.defuse();
220
221        let result = self
222            .try_frontend_peek_inner(session, catalog, stmt, params, statement_logging_id)
223            .await;
224
225        // Log the end of execution if we are logging this statement and
226        // execution has already ended.
227        if let Some(logging_id) = statement_logging_id {
228            let reason = match &result {
229                // Streaming results are handled asynchronously by the
230                // coordinator — it will log the end via
231                // `handle_peek_notification`, so skip emitting here.
232                Ok(Some(
233                    ExecuteResponse::SendingRowsStreaming { .. }
234                    | ExecuteResponse::Subscribing { .. },
235                )) => {
236                    return result;
237                }
238                // COPY TO wrapping a streaming response: same handoff.
239                Ok(Some(resp @ ExecuteResponse::CopyTo { resp: inner, .. })) => {
240                    match inner.as_ref() {
241                        ExecuteResponse::SendingRowsStreaming { .. }
242                        | ExecuteResponse::Subscribing { .. } => {
243                            return result;
244                        }
245                        // For non-streaming COPY TO responses, use the outer
246                        // CopyTo for conversion.
247                        _ => resp.into(),
248                    }
249                }
250                // Bailout case, which should not happen.
251                Ok(None) => {
252                    soft_panic_or_log!(
253                        "Bailed out from `try_frontend_peek_inner` after we already logged the beginning of statement execution."
254                    );
255                    // The old peek sequencing would start its own statement
256                    // logging from scratch; close out this one as errored.
257                    self.log_ended_execution(
258                        logging_id,
259                        StatementEndedExecutionReason::Errored {
260                            error: "Internal error: bailed out from `try_frontend_peek_inner`"
261                                .to_string(),
262                        },
263                    );
264                    return result;
265                }
266                // All other success responses — use the `From` implementation.
267                // TODO(peek-seq): After we delete the old peek sequencing, we
268                // can adjust the `From` impl to do exactly what we need here,
269                // so the special cases above won't be needed.
270                Ok(Some(resp)) => resp.into(),
271                Err(e) => StatementEndedExecutionReason::Errored {
272                    error: e.to_string(),
273                },
274            };
275
276            self.log_ended_execution(logging_id, reason);
277        }
278
279        result
280    }
281
282    /// This is encapsulated in an inner function so that the outer function can still do statement
283    /// logging after the `?` returns of the inner function.
284    async fn try_frontend_peek_inner(
285        &mut self,
286        session: &mut Session,
287        catalog: Arc<Catalog>,
288        stmt: Option<Arc<Statement<Raw>>>,
289        params: Params,
290        statement_logging_id: Option<crate::statement_logging::StatementLoggingId>,
291    ) -> Result<Option<ExecuteResponse>, AdapterError> {
292        let stmt = match stmt {
293            Some(stmt) => stmt,
294            None => {
295                debug!("try_frontend_peek_inner succeeded on an empty query");
296                return Ok(Some(ExecuteResponse::EmptyQuery));
297            }
298        };
299
300        session
301            .metrics()
302            .query_total(&[
303                metrics::session_type_label_value(session.user()),
304                metrics::statement_type_label_value(&stmt),
305            ])
306            .inc();
307
308        // # From handle_execute_inner
309
310        let conn_catalog = catalog.for_session(session);
311        // (`resolved_ids` should be derivable from `stmt`. If `stmt` is later transformed to
312        // remove/add IDs, then `resolved_ids` should be updated to also remove/add those IDs.)
313        let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?;
314
315        let pcx = session.pcx();
316        let (plan, sql_impl_ids) =
317            mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, &params, &resolved_ids)?;
318
319        /// What do we do with the result of the select?
320        enum QueryPlan<'a> {
321            Select(&'a SelectPlan),
322            CopyTo(&'a SelectPlan, CopyToContext),
323            Subscribe(&'a SubscribePlan),
324        }
325
326        let (query_plan, explain_ctx) = match &plan {
327            Plan::Select(select_plan) => {
328                let explain_ctx = if session.vars().emit_plan_insights_notice() {
329                    let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
330                    ExplainContext::PlanInsightsNotice(optimizer_trace)
331                } else {
332                    ExplainContext::None
333                };
334                (QueryPlan::Select(select_plan), explain_ctx)
335            }
336            Plan::ShowColumns(show_columns_plan) => {
337                // ShowColumns wraps a SelectPlan, extract it and proceed as normal.
338                (
339                    QueryPlan::Select(&show_columns_plan.select_plan),
340                    ExplainContext::None,
341                )
342            }
343            Plan::ExplainPlan(plan::ExplainPlanPlan {
344                stage,
345                format,
346                config,
347                explainee: Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc }),
348            }) => {
349                // Create OptimizerTrace to collect optimizer plans
350                let optimizer_trace = OptimizerTrace::new(stage.paths());
351                let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
352                    broken: *broken,
353                    config: config.clone(),
354                    format: *format,
355                    stage: *stage,
356                    replan: None,
357                    desc: Some(desc.clone()),
358                    optimizer_trace,
359                });
360                (QueryPlan::Select(plan), explain_ctx)
361            }
362            // COPY TO S3
363            Plan::CopyTo(plan::CopyToPlan {
364                select_plan,
365                desc,
366                to,
367                connection,
368                connection_id,
369                format,
370                max_file_size,
371            }) => {
372                let uri = eval_copy_to_uri(to.clone(), session, catalog.state())?;
373
374                // (output_batch_count will be set later)
375                let copy_to_ctx = CopyToContext {
376                    desc: desc.clone(),
377                    uri,
378                    connection: connection.clone(),
379                    connection_id: *connection_id,
380                    format: format.clone(),
381                    max_file_size: *max_file_size,
382                    output_batch_count: None,
383                };
384
385                (
386                    QueryPlan::CopyTo(select_plan, copy_to_ctx),
387                    ExplainContext::None,
388                )
389            }
390            Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => {
391                // Only handle EXPLAIN FILTER PUSHDOWN for SELECT statements
392                match explainee {
393                    plan::Explainee::Statement(plan::ExplaineeStatement::Select {
394                        broken: false,
395                        plan,
396                        desc: _,
397                    }) => {
398                        let explain_ctx = ExplainContext::Pushdown;
399                        (QueryPlan::Select(plan), explain_ctx)
400                    }
401                    _ => {
402                        // This shouldn't happen because we already checked for this at the AST
403                        // level before calling `try_frontend_peek_inner`.
404                        soft_panic_or_log!(
405                            "unexpected EXPLAIN FILTER PUSHDOWN plan kind in frontend peek sequencing: {:?}",
406                            explainee
407                        );
408                        debug!(
409                            "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is EXPLAIN BROKEN"
410                        );
411                        return Ok(None);
412                    }
413                }
414            }
415            Plan::SideEffectingFunc(sef_plan) => {
416                // Side-effecting functions need Coordinator state (e.g., active_conns),
417                // so delegate to the Coordinator via a Command.
418                // The RBAC check is performed in the Coordinator where active_conns is available.
419                let response = self
420                    .call_coordinator(|tx| Command::ExecuteSideEffectingFunc {
421                        plan: sef_plan.clone(),
422                        conn_id: session.conn_id().clone(),
423                        current_role: session.role_metadata().current_role,
424                        tx,
425                    })
426                    .await?;
427                return Ok(Some(response));
428            }
429            Plan::Subscribe(subscribe) => (QueryPlan::Subscribe(subscribe), ExplainContext::None),
430            _ => {
431                // This shouldn't happen because we already checked for this at the AST
432                // level before calling `try_frontend_peek_inner`.
433                soft_panic_or_log!(
434                    "Unexpected plan kind in frontend peek sequencing: {:?}",
435                    plan
436                );
437                debug!(
438                    "Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, side-effecting SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO S3"
439                );
440                return Ok(None);
441            }
442        };
443
444        let when = match query_plan {
445            QueryPlan::Select(s) => &s.when,
446            QueryPlan::CopyTo(s, _) => &s.when,
447            QueryPlan::Subscribe(s) => &s.when,
448        };
449
450        let depends_on = match query_plan {
451            QueryPlan::Select(s) => s.source.depends_on(),
452            QueryPlan::CopyTo(s, _) => s.source.depends_on(),
453            QueryPlan::Subscribe(s) => s.from.depends_on(),
454        };
455
456        let contains_temporal = match query_plan {
457            QueryPlan::Select(s) => s.source.contains_temporal(),
458            QueryPlan::CopyTo(s, _) => s.source.contains_temporal(),
459            QueryPlan::Subscribe(s) => s.from.contains_temporal(),
460        };
461
462        // # From sequence_plan
463
464        // We have checked the plan kind above.
465        assert!(plan.allowed_in_read_only());
466
467        let (cluster, target_cluster_id, target_cluster_name) = {
468            let target_cluster = match session.transaction().cluster() {
469                // Use the current transaction's cluster.
470                Some(cluster_id) => TargetCluster::Transaction(cluster_id),
471                // If there isn't a current cluster set for a transaction, then try to auto route.
472                None => coord::catalog_serving::auto_run_on_catalog_server(
473                    &conn_catalog,
474                    session,
475                    &plan,
476                ),
477            };
478            let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
479            (cluster, cluster.id, &cluster.name)
480        };
481
482        // Log cluster selection
483        if let Some(logging_id) = &statement_logging_id {
484            self.log_set_cluster(*logging_id, target_cluster_id, target_cluster_name.clone());
485        }
486
487        coord::catalog_serving::check_cluster_restrictions(
488            target_cluster_name.as_str(),
489            &conn_catalog,
490            &plan,
491        )?;
492
493        rbac::check_plan(
494            &conn_catalog,
495            // We can't look at `active_conns` here, but that's ok, because this case was handled
496            // above already inside `Command::ExecuteSideEffectingFunc`.
497            None::<fn(u32) -> Option<RoleId>>,
498            session,
499            &plan,
500            Some(target_cluster_id),
501            &resolved_ids,
502            &sql_impl_ids,
503        )?;
504
505        if let Some((_, wait_future)) =
506            coord::appends::waiting_on_startup_appends(&*catalog, session, &plan)
507        {
508            wait_future.await;
509        }
510
511        let max_query_result_size = Some(session.vars().max_query_result_size());
512
513        // # From sequence_peek
514
515        // # From peek_validate
516
517        let compute_instance_snapshot =
518            ComputeInstanceSnapshot::new_without_collections(cluster.id());
519
520        let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config())
521            .override_from(&catalog.get_cluster(cluster.id()).config.features())
522            // A cluster-scoped LaunchDarkly rule beats a manual `FEATURES` pin.
523            .override_from(
524                &catalog
525                    .state()
526                    .cluster_scoped_optimizer_overrides(cluster.id()),
527            )
528            .override_from(&explain_ctx);
529
530        if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
531            return Err(AdapterError::NoClusterReplicasAvailable {
532                name: cluster.name.clone(),
533                is_managed: cluster.is_managed(),
534            });
535        }
536
537        let (_, view_id) = self.transient_id_gen.allocate_id();
538        let (_, index_id) = self.transient_id_gen.allocate_id();
539
540        let target_replica_name = session.vars().cluster_replica();
541        let mut target_replica = target_replica_name
542            .map(|name| {
543                cluster
544                    .replica_id(name)
545                    .ok_or(AdapterError::UnknownClusterReplica {
546                        cluster_name: cluster.name.clone(),
547                        replica_name: name.to_string(),
548                    })
549            })
550            .transpose()?;
551
552        let source_ids = depends_on;
553        // TODO(peek-seq): validate_timeline_context can be expensive in real scenarios (not in
554        // simple benchmarks), because it traverses transitive dependencies even of indexed views and
555        // materialized views (also traversing their MIR plans).
556        let mut timeline_context = catalog.validate_timeline_context(source_ids.iter().copied())?;
557        if matches!(timeline_context, TimelineContext::TimestampIndependent) && contains_temporal {
558            // If the source IDs are timestamp independent but the query contains temporal functions,
559            // then the timeline context needs to be upgraded to timestamp dependent. This is
560            // required because `source_ids` doesn't contain functions.
561            timeline_context = TimelineContext::TimestampDependent;
562        }
563
564        let notices = coord::sequencer::check_log_reads(
565            &catalog,
566            cluster,
567            &source_ids,
568            &mut target_replica,
569            session.vars(),
570        )?;
571        session.add_notices(notices);
572
573        // # From peek_linearize_timestamp
574
575        let isolation_level = session.vars().transaction_isolation().clone();
576        let timeline = Coordinator::get_timeline(&timeline_context);
577        let needs_linearized_read_ts =
578            Coordinator::needs_linearized_read_ts(&isolation_level, when);
579
580        let oracle_read_ts = match timeline {
581            Some(timeline) if needs_linearized_read_ts => {
582                let oracle = self.ensure_oracle(timeline).await?;
583                let oracle_read_ts = oracle.read_ts().await;
584                Some(oracle_read_ts)
585            }
586            Some(_) | None => None,
587        };
588
589        // # From peek_real_time_recency
590
591        let vars = session.vars();
592        let real_time_recency_ts: Option<Timestamp> = if vars.real_time_recency()
593            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
594            && !session.contains_read_timestamp()
595        {
596            // Only call the coordinator when we actually need real-time recency
597            self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp {
598                source_ids: source_ids.clone(),
599                real_time_recency_timeout: *vars.real_time_recency_timeout(),
600                tx,
601            })
602            .await?
603        } else {
604            None
605        };
606
607        // # From peek_timestamp_read_hold
608
609        let dataflow_builder =
610            DataflowBuilder::new(catalog.state(), compute_instance_snapshot.clone());
611        let input_id_bundle = dataflow_builder.sufficient_collections(source_ids.clone());
612
613        // ## From sequence_peek_timestamp
614
615        // Warning: This will be false for AS OF queries, even if we are otherwise inside a
616        // multi-statement transaction. (It's also false for FreshestTableWrite, which is currently
617        // only read-then-write queries, which can't be part of multi-statement transactions, so
618        // FreshestTableWrite doesn't matter.)
619        //
620        // TODO(peek-seq): It's not totally clear to me what the intended semantics are for AS OF
621        // queries inside a transaction: We clearly can't use the transaction timestamp, but the old
622        // peek sequencing still does a timedomain validation. The new peek sequencing does not do
623        // timedomain validation for AS OF queries, which seems more natural. But I'm thinking that
624        // it would be the cleanest to just simply disallow AS OF queries inside transactions.
625        let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when)
626            && !matches!(query_plan, QueryPlan::Subscribe { .. });
627
628        // Fetch or generate a timestamp for this query and fetch or acquire read holds.
629        let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
630            // Use the transaction's timestamp if it exists and this isn't an AS OF query.
631            // (`in_immediate_multi_stmt_txn` is false for AS OF queries.)
632            Some(
633                determination @ TimestampDetermination {
634                    timestamp_context: TimestampContext::TimelineTimestamp { .. },
635                    ..
636                },
637            ) if in_immediate_multi_stmt_txn => {
638                // This is a subsequent (non-AS OF, non-constant) query in a multi-statement
639                // transaction. We now:
640                // - Validate that the query only accesses collections within the transaction's
641                //   timedomain (which we know from the stored read holds).
642                // - Use the transaction's stored timestamp determination.
643                // - Use the (relevant subset of the) transaction's read holds.
644
645                let txn_read_holds_opt = self
646                    .call_coordinator(|tx| Command::GetTransactionReadHoldsBundle {
647                        conn_id: session.conn_id().clone(),
648                        tx,
649                    })
650                    .await;
651
652                if let Some(txn_read_holds) = txn_read_holds_opt {
653                    let allowed_id_bundle = txn_read_holds.id_bundle();
654                    let outside = input_id_bundle.difference(&allowed_id_bundle);
655
656                    // Queries without a timestamp and timeline can belong to any existing timedomain.
657                    if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
658                        let valid_names =
659                            allowed_id_bundle.resolve_names(&*catalog, session.conn_id());
660                        let invalid_names = outside.resolve_names(&*catalog, session.conn_id());
661                        return Err(AdapterError::RelationOutsideTimeDomain {
662                            relations: invalid_names,
663                            names: valid_names,
664                        });
665                    }
666
667                    // Extract the subset of read holds for the collections this query accesses.
668                    let read_holds = txn_read_holds.subset(&input_id_bundle);
669
670                    (determination, read_holds)
671                } else {
672                    // This should never happen: we're in a subsequent query of a multi-statement
673                    // transaction (we have a transaction timestamp), but the coordinator has no
674                    // transaction read holds stored. This indicates a bug in the transaction
675                    // handling.
676                    return Err(AdapterError::Internal(
677                        "Missing transaction read holds for multi-statement transaction"
678                            .to_string(),
679                    ));
680                }
681            }
682            _ => {
683                // There is no timestamp determination yet for this transaction. Either:
684                // - We are not in a multi-statement transaction.
685                // - This is the first (non-AS OF) query in a multi-statement transaction.
686                // - This is an AS OF query.
687                // - This is a constant query (`TimestampContext::NoTimestamp`).
688
689                let timedomain_bundle;
690                let determine_bundle = if in_immediate_multi_stmt_txn {
691                    // This is the first (non-AS OF) query in a multi-statement transaction.
692                    // Determine a timestamp that will be valid for anything in any schema
693                    // referenced by the first query.
694                    timedomain_bundle = timedomain_for(
695                        &*catalog,
696                        &dataflow_builder,
697                        &source_ids,
698                        &timeline_context,
699                        session.conn_id(),
700                        target_cluster_id,
701                    )?;
702                    &timedomain_bundle
703                } else {
704                    // Simply use the inputs of the current query.
705                    &input_id_bundle
706                };
707                let (determination, read_holds) = self
708                    .frontend_determine_timestamp(
709                        session,
710                        determine_bundle,
711                        when,
712                        target_cluster_id,
713                        &timeline_context,
714                        oracle_read_ts,
715                        real_time_recency_ts,
716                    )
717                    .await?;
718
719                // If this is the first (non-AS OF) query in a multi-statement transaction, store
720                // the read holds in the coordinator, so subsequent queries can validate against
721                // them.
722                if in_immediate_multi_stmt_txn {
723                    self.call_coordinator(|tx| Command::StoreTransactionReadHolds {
724                        conn_id: session.conn_id().clone(),
725                        read_holds: read_holds.clone(),
726                        tx,
727                    })
728                    .await;
729                }
730
731                (determination, read_holds)
732            }
733        };
734
735        {
736            // Assert that we have a read hold for all the collections in our `input_id_bundle`.
737            for id in input_id_bundle.iter() {
738                let s = read_holds.storage_holds.contains_key(&id);
739                let c = read_holds
740                    .compute_ids()
741                    .map(|(_instance, coll)| coll)
742                    .contains(&id);
743                soft_assert_or_log!(
744                    s || c,
745                    "missing read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
746                    id,
747                    in_immediate_multi_stmt_txn,
748                );
749            }
750
751            // Assert that each part of the `input_id_bundle` corresponds to the right part of
752            // `read_holds`.
753            for id in input_id_bundle.storage_ids.iter() {
754                soft_assert_or_log!(
755                    read_holds.storage_holds.contains_key(id),
756                    "missing storage read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
757                    id,
758                    in_immediate_multi_stmt_txn,
759                );
760            }
761            for id in input_id_bundle
762                .compute_ids
763                .iter()
764                .flat_map(|(_instance, colls)| colls)
765            {
766                soft_assert_or_log!(
767                    read_holds
768                        .compute_ids()
769                        .map(|(_instance, coll)| coll)
770                        .contains(id),
771                    "missing compute read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
772                    id,
773                    in_immediate_multi_stmt_txn,
774                );
775            }
776        }
777
778        // (TODO(peek-seq): The below TODO is copied from the old peek sequencing. We should resolve
779        // this when we decide what to with `AS OF` in transactions.)
780        // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
781        // arbitrary and we don't recall why we did it (possibly an error!). Change this to always
782        // set the transaction ops. Decide and document what our policy should be on AS OF queries.
783        // Maybe they shouldn't be allowed in transactions at all because it's hard to explain
784        // what's going on there. This should probably get a small design document.
785
786        // We only track the peeks in the session if the query doesn't use AS
787        // OF or we're inside an explicit transaction. The latter case is
788        // necessary to support PG's `BEGIN` semantics, whose behavior can
789        // depend on whether or not reads have occurred in the txn.
790        let requires_linearization = (&explain_ctx).into();
791        let mut transaction_determination = determination.clone();
792        match query_plan {
793            QueryPlan::Subscribe { .. } => {
794                if when.is_transactional() {
795                    session.add_transaction_ops(TransactionOps::Subscribe)?;
796                }
797            }
798            QueryPlan::Select(..) | QueryPlan::CopyTo(..) => {
799                if when.is_transactional() {
800                    session.add_transaction_ops(TransactionOps::Peeks {
801                        determination: transaction_determination,
802                        cluster_id: target_cluster_id,
803                        requires_linearization,
804                    })?;
805                } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
806                    // If the query uses AS OF, then ignore the timestamp.
807                    transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
808                    session.add_transaction_ops(TransactionOps::Peeks {
809                        determination: transaction_determination,
810                        cluster_id: target_cluster_id,
811                        requires_linearization,
812                    })?;
813                }
814            }
815        }
816
817        // # From peek_optimize
818
819        let stats = statistics_oracle(
820            session,
821            &source_ids,
822            &determination.timestamp_context.antichain(),
823            true,
824            catalog.system_config(),
825            &*self.storage_collections,
826        )
827        .await
828        .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
829
830        // Generate data structures that can be moved to another task where we will perform possibly
831        // expensive optimizations.
832        let timestamp_context = determination.timestamp_context.clone();
833        let session_meta = session.meta();
834        let now = catalog.config().now.clone();
835        let target_cluster_name = target_cluster_name.clone();
836        let needs_plan_insights = explain_ctx.needs_plan_insights();
837        let determination_for_pushdown = if matches!(explain_ctx, ExplainContext::Pushdown) {
838            // This is a hairy data structure, so avoid this clone if we are not in
839            // EXPLAIN FILTER PUSHDOWN.
840            Some(determination.clone())
841        } else {
842            None
843        };
844
845        let span = Span::current();
846
847        // Prepare data for plan insights if needed
848        let catalog_for_insights = if needs_plan_insights {
849            Some(Arc::clone(&catalog))
850        } else {
851            None
852        };
853        let mut compute_instances = BTreeMap::new();
854        if needs_plan_insights {
855            for user_cluster in catalog.user_clusters() {
856                let snapshot = ComputeInstanceSnapshot::new_without_collections(user_cluster.id);
857                compute_instances.insert(user_cluster.name.clone(), snapshot);
858            }
859        }
860
861        let source_ids_for_closure = source_ids.clone();
862
863        let optimization_future: JoinHandle<Result<_, AdapterError>> = match query_plan {
864            QueryPlan::CopyTo(select_plan, mut copy_to_ctx) => {
865                let raw_expr = select_plan.source.clone();
866
867                // COPY TO path: calculate output_batch_count and create copy_to optimizer
868                let worker_counts = cluster.replicas().map(|r| {
869                    let loc = &r.config.location;
870                    loc.workers().unwrap_or_else(|| loc.num_processes())
871                });
872                let max_worker_count = match worker_counts.max() {
873                    Some(count) => u64::cast_from(count),
874                    None => {
875                        return Err(AdapterError::NoClusterReplicasAvailable {
876                            name: cluster.name.clone(),
877                            is_managed: cluster.is_managed(),
878                        });
879                    }
880                };
881                copy_to_ctx.output_batch_count = Some(max_worker_count);
882
883                let mut optimizer = optimize::copy_to::Optimizer::new(
884                    Arc::clone(&catalog),
885                    compute_instance_snapshot,
886                    view_id,
887                    copy_to_ctx,
888                    optimizer_config,
889                    self.optimizer_metrics.clone(),
890                );
891
892                mz_ore::task::spawn_blocking(
893                    || "optimize copy-to",
894                    move || {
895                        span.in_scope(|| {
896                            let _dispatch_guard = explain_ctx.dispatch_guard();
897
898                            // COPY TO path: HIR ⇒ local MIR ⇒ resolve ⇒ global LIR.
899                            let global_lir_plan = optimize::optimize_oneshot(
900                                &mut optimizer,
901                                raw_expr.clone(),
902                                |local_mir_plan| {
903                                    local_mir_plan.resolve(
904                                        timestamp_context.clone(),
905                                        &session_meta,
906                                        stats,
907                                    )
908                                },
909                            )?;
910                            Ok(Execution::CopyToS3 {
911                                global_lir_plan,
912                                source_ids: source_ids_for_closure,
913                            })
914                        })
915                    },
916                )
917            }
918            QueryPlan::Select(select_plan) => {
919                let select_plan = select_plan.clone();
920                let raw_expr = select_plan.source.clone();
921
922                // SELECT/EXPLAIN path: create peek optimizer
923                let mut optimizer = optimize::peek::Optimizer::new(
924                    Arc::clone(&catalog),
925                    compute_instance_snapshot,
926                    select_plan.finishing.clone(),
927                    view_id,
928                    index_id,
929                    optimizer_config,
930                    self.optimizer_metrics.clone(),
931                );
932
933                mz_ore::task::spawn_blocking(
934                    || "optimize peek",
935                    move || {
936                        span.in_scope(|| {
937                            let _dispatch_guard = explain_ctx.dispatch_guard();
938
939                            // SELECT/EXPLAIN path: HIR ⇒ local MIR ⇒ resolve ⇒
940                            // global LIR. We capture the result (rather than
941                            // propagating with `?`) so that a failure can still be
942                            // routed to `EXPLAIN BROKEN` below.
943                            let global_lir_plan_result = optimize::optimize_oneshot(
944                                &mut optimizer,
945                                raw_expr.clone(),
946                                |local_mir_plan| {
947                                    local_mir_plan.resolve(
948                                        timestamp_context.clone(),
949                                        &session_meta,
950                                        stats,
951                                    )
952                                },
953                            )
954                            .map_err(AdapterError::from);
955                            let optimization_finished_at = now();
956
957                            let create_insights_ctx =
958                                |optimizer: &optimize::peek::Optimizer,
959                                 is_notice: bool|
960                                 -> Option<Box<PlanInsightsContext>> {
961                                    if !needs_plan_insights {
962                                        return None;
963                                    }
964
965                                    let catalog = catalog_for_insights.as_ref()?;
966
967                                    let enable_re_optimize = if needs_plan_insights {
968                                        // Disable any plan insights that use the optimizer if we only want the
969                                        // notice and plan optimization took longer than the threshold. This is
970                                        // to prevent a situation where optimizing takes a while and there are
971                                        // lots of clusters, which would delay peek execution by the product of
972                                        // those.
973                                        //
974                                        // (This heuristic doesn't work well, see #9492.)
975                                        let dyncfgs = catalog.system_config().dyncfgs();
976                                        let opt_limit = mz_adapter_types::dyncfgs
977                                        ::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
978                                            .get(dyncfgs);
979                                        !(is_notice && optimizer.duration() > opt_limit)
980                                    } else {
981                                        false
982                                    };
983
984                                    Some(Box::new(PlanInsightsContext {
985                                        stmt: select_plan
986                                            .select
987                                            .as_deref()
988                                            .map(Clone::clone)
989                                            .map(Statement::Select),
990                                        raw_expr: raw_expr.clone(),
991                                        catalog: Arc::clone(catalog),
992                                        compute_instances,
993                                        target_instance: target_cluster_name,
994                                        metrics: optimizer.metrics().clone(),
995                                        finishing: optimizer.finishing().clone(),
996                                        optimizer_config: optimizer.config().clone(),
997                                        session: session_meta,
998                                        timestamp_context,
999                                        view_id: optimizer.select_id(),
1000                                        index_id: optimizer.index_id(),
1001                                        enable_re_optimize,
1002                                    }))
1003                                };
1004
1005                            let global_lir_plan = match global_lir_plan_result {
1006                                Ok(plan) => plan,
1007                                Err(err) => {
1008                                    let result = if let ExplainContext::Plan(explain_ctx) =
1009                                        explain_ctx
1010                                        && explain_ctx.broken
1011                                    {
1012                                        // EXPLAIN BROKEN: log error and continue with defaults
1013                                        tracing::error!(
1014                                            "error while handling EXPLAIN statement: {}",
1015                                            err
1016                                        );
1017                                        Ok(Execution::ExplainPlan {
1018                                            df_meta: Default::default(),
1019                                            explain_ctx,
1020                                            optimizer,
1021                                            insights_ctx: None,
1022                                        })
1023                                    } else {
1024                                        Err(err)
1025                                    };
1026                                    return result;
1027                                }
1028                            };
1029
1030                            match explain_ctx {
1031                                ExplainContext::Plan(explain_ctx) => {
1032                                    let (_, df_meta, _) = global_lir_plan.unapply();
1033                                    let insights_ctx = create_insights_ctx(&optimizer, false);
1034                                    Ok(Execution::ExplainPlan {
1035                                        df_meta,
1036                                        explain_ctx,
1037                                        optimizer,
1038                                        insights_ctx,
1039                                    })
1040                                }
1041                                ExplainContext::None => Ok(Execution::Peek {
1042                                    global_lir_plan,
1043                                    optimization_finished_at,
1044                                    plan_insights_optimizer_trace: None,
1045                                    finishing: select_plan.finishing,
1046                                    copy_to: select_plan.copy_to,
1047                                    insights_ctx: None,
1048                                }),
1049                                ExplainContext::PlanInsightsNotice(optimizer_trace) => {
1050                                    let insights_ctx = create_insights_ctx(&optimizer, true);
1051                                    Ok(Execution::Peek {
1052                                        global_lir_plan,
1053                                        optimization_finished_at,
1054                                        plan_insights_optimizer_trace: Some(optimizer_trace),
1055                                        finishing: select_plan.finishing,
1056                                        copy_to: select_plan.copy_to,
1057                                        insights_ctx,
1058                                    })
1059                                }
1060                                ExplainContext::Pushdown => {
1061                                    let (plan, _, _) = global_lir_plan.unapply();
1062                                    let imports = match plan {
1063                                        PeekPlan::SlowPath(plan) => plan
1064                                            .desc
1065                                            .source_imports
1066                                            .into_iter()
1067                                            .filter_map(|(id, import)| {
1068                                                import.desc.arguments.operators.map(|mfp| (id, mfp))
1069                                            })
1070                                            .collect(),
1071                                        PeekPlan::FastPath(_) => {
1072                                            std::collections::BTreeMap::default()
1073                                        }
1074                                    };
1075                                    Ok(Execution::ExplainPushdown {
1076                                        imports,
1077                                        determination: determination_for_pushdown
1078                                            .expect("it's present for the ExplainPushdown case"),
1079                                    })
1080                                }
1081                            }
1082                        })
1083                    },
1084                )
1085            }
1086            QueryPlan::Subscribe(plan) => {
1087                let plan = plan.clone();
1088                let catalog: Arc<Catalog> = Arc::clone(&catalog);
1089                let debug_name = format!("subscribe-{}", index_id);
1090                let mut optimizer = optimize::subscribe::Optimizer::new(
1091                    catalog,
1092                    compute_instance_snapshot.clone(),
1093                    view_id,
1094                    index_id,
1095                    plan.with_snapshot,
1096                    plan.up_to,
1097                    debug_name,
1098                    optimizer_config,
1099                    self.optimizer_metrics.clone(),
1100                );
1101                mz_ore::task::spawn_blocking(
1102                    || "optimize subscribe",
1103                    move || {
1104                        span.in_scope(|| {
1105                            let _dispatch_guard = explain_ctx.dispatch_guard();
1106
1107                            let global_mir_plan = optimizer.catch_unwind_optimize(plan.clone())?;
1108                            let as_of = timestamp_context.timestamp_or_default();
1109
1110                            if let Some(up_to) = optimizer.up_to() {
1111                                if as_of > up_to {
1112                                    return Err(AdapterError::AbsurdSubscribeBounds {
1113                                        as_of,
1114                                        up_to,
1115                                    });
1116                                }
1117                            }
1118                            let local_mir_plan =
1119                                global_mir_plan.resolve(Antichain::from_elem(as_of));
1120
1121                            let global_lir_plan =
1122                                optimizer.catch_unwind_optimize(local_mir_plan)?;
1123                            let optimization_finished_at = now();
1124
1125                            let (df_desc, df_meta) = global_lir_plan.unapply();
1126                            Ok(Execution::Subscribe {
1127                                subscribe_plan: plan,
1128                                df_desc,
1129                                df_meta,
1130                                optimization_finished_at,
1131                            })
1132                        })
1133                    },
1134                )
1135            }
1136        };
1137
1138        let mut optimization_timeout = *session.vars().statement_timeout();
1139        // Timeout of 0 is equivalent to "off", meaning we will wait "forever."
1140        if optimization_timeout == Duration::ZERO {
1141            optimization_timeout = Duration::MAX;
1142        }
1143        let optimization_result =
1144            // Note: spawn_blocking tasks cannot be cancelled, so on timeout we stop waiting but the
1145            // optimization task continues running in the background until completion. See
1146            // https://github.com/MaterializeInc/database-issues/issues/8644 for properly cancelling
1147            // optimizer runs.
1148            match tokio::time::timeout(optimization_timeout, optimization_future).await {
1149                Ok(Ok(result)) => result,
1150                Ok(Err(AdapterError::Optimizer(err))) => {
1151                    return Err(AdapterError::Internal(format!(
1152                        "internal error in optimizer: {}",
1153                        err
1154                    )));
1155                }
1156                Ok(Err(err)) => {
1157                    return Err(err);
1158                }
1159                Err(_elapsed) => {
1160                    warn!("optimize peek timed out after {:?}", optimization_timeout);
1161                    return Err(AdapterError::StatementTimeout);
1162                }
1163            };
1164
1165        // Log optimization finished
1166        if let Some(logging_id) = &statement_logging_id {
1167            self.log_lifecycle_event(*logging_id, StatementLifecycleEvent::OptimizationFinished);
1168        }
1169
1170        // Assert that read holds are correct for the execution plan
1171        Self::assert_read_holds_correct(
1172            &read_holds,
1173            &optimization_result,
1174            &determination,
1175            target_cluster_id,
1176            in_immediate_multi_stmt_txn,
1177        );
1178
1179        // Handle the optimization result: either generate EXPLAIN output or continue with execution
1180        match optimization_result {
1181            Execution::ExplainPlan {
1182                df_meta,
1183                explain_ctx,
1184                optimizer,
1185                insights_ctx,
1186            } => {
1187                let rows = coord::sequencer::explain_plan_inner(
1188                    session,
1189                    &catalog,
1190                    df_meta,
1191                    explain_ctx,
1192                    optimizer,
1193                    insights_ctx,
1194                )
1195                .await?;
1196
1197                Ok(Some(ExecuteResponse::SendingRowsImmediate {
1198                    rows: Box::new(rows.into_row_iter()),
1199                }))
1200            }
1201            Execution::ExplainPushdown {
1202                imports,
1203                determination,
1204            } => {
1205                // # From peek_explain_pushdown
1206
1207                let as_of = determination.timestamp_context.antichain();
1208                let mz_now = determination
1209                    .timestamp_context
1210                    .timestamp()
1211                    .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1212                    .unwrap_or_else(ResultSpec::value_all);
1213
1214                Ok(Some(
1215                    coord::sequencer::explain_pushdown_future_inner(
1216                        session,
1217                        &*catalog,
1218                        &self.storage_collections,
1219                        as_of,
1220                        mz_now,
1221                        imports,
1222                    )
1223                    .await
1224                    .await?,
1225                ))
1226            }
1227            Execution::Peek {
1228                global_lir_plan,
1229                optimization_finished_at: _optimization_finished_at,
1230                plan_insights_optimizer_trace,
1231                finishing,
1232                copy_to,
1233                insights_ctx,
1234            } => {
1235                // Continue with normal execution
1236                // # From peek_finish
1237
1238                // The typ here was generated from the HIR SQL type and simply stored in LIR.
1239                let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
1240
1241                coord::sequencer::emit_optimizer_notices(
1242                    &*catalog,
1243                    session,
1244                    &df_meta.optimizer_notices,
1245                );
1246
1247                // Generate plan insights notice if needed
1248                if let Some(trace) = plan_insights_optimizer_trace {
1249                    let target_cluster = catalog.get_cluster(target_cluster_id);
1250                    let features = OptimizerFeatures::from(catalog.system_config())
1251                        .override_from(&target_cluster.config.features())
1252                        // A cluster-scoped LaunchDarkly rule beats a manual
1253                        // `FEATURES` pin.
1254                        .override_from(
1255                            &catalog
1256                                .state()
1257                                .cluster_scoped_optimizer_overrides(target_cluster_id),
1258                        );
1259                    let insights = trace
1260                        .into_plan_insights(
1261                            &features,
1262                            &catalog.for_session(session),
1263                            Some(finishing.clone()),
1264                            Some(target_cluster),
1265                            df_meta.clone(),
1266                            insights_ctx,
1267                        )
1268                        .await?;
1269                    session.add_notice(AdapterNotice::PlanInsights(insights));
1270                }
1271
1272                // # Now back to peek_finish
1273
1274                let watch_set = statement_logging_id.map(|logging_id| {
1275                    WatchSetCreation::new(
1276                        logging_id,
1277                        catalog.state(),
1278                        &input_id_bundle,
1279                        determination.timestamp_context.timestamp_or_default(),
1280                    )
1281                });
1282
1283                let max_result_size = catalog.system_config().max_result_size();
1284
1285                // Clone determination if we need it for emit_timestamp_notice, since it may be
1286                // moved into Command::ExecuteSlowPathPeek.
1287                let determination_for_notice = if session.vars().emit_timestamp_notice() {
1288                    Some(determination.clone())
1289                } else {
1290                    None
1291                };
1292
1293                let response = match peek_plan {
1294                    PeekPlan::FastPath(fast_path_plan) => {
1295                        if let Some(logging_id) = &statement_logging_id {
1296                            // TODO(peek-seq): Actually, we should log it also for
1297                            // FastPathPlan::Constant. The only reason we are not doing so at the
1298                            // moment is to match the old peek sequencing, so that statement logging
1299                            // tests pass with the frontend peek sequencing turned both on and off.
1300                            //
1301                            // When the old sequencing is removed, we should make a couple of
1302                            // changes in how we log timestamps:
1303                            // - Move this up to just after timestamp determination, so that it
1304                            //   appears in the log as soon as possible.
1305                            // - Do it also for Constant peeks.
1306                            // - Currently, slow-path peeks' timestamp logging is done by
1307                            //   `implement_peek_plan`. We could remove it from there, and just do
1308                            //   it here.
1309                            if !matches!(fast_path_plan, FastPathPlan::Constant(..)) {
1310                                self.log_set_timestamp(
1311                                    *logging_id,
1312                                    determination.timestamp_context.timestamp_or_default(),
1313                                );
1314                            }
1315                        }
1316
1317                        let row_set_finishing_seconds =
1318                            session.metrics().row_set_finishing_seconds().clone();
1319
1320                        let peek_stash_read_batch_size_bytes =
1321                            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
1322                                .get(catalog.system_config().dyncfgs());
1323                        let peek_stash_read_memory_budget_bytes =
1324                            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
1325                                .get(catalog.system_config().dyncfgs());
1326
1327                        self.implement_fast_path_peek_plan(
1328                            fast_path_plan,
1329                            determination.timestamp_context.timestamp_or_default(),
1330                            finishing,
1331                            target_cluster_id,
1332                            target_replica,
1333                            typ,
1334                            max_result_size,
1335                            max_query_result_size,
1336                            row_set_finishing_seconds,
1337                            read_holds,
1338                            peek_stash_read_batch_size_bytes,
1339                            peek_stash_read_memory_budget_bytes,
1340                            session.conn_id().clone(),
1341                            source_ids,
1342                            watch_set,
1343                        )
1344                        .await?
1345                    }
1346                    PeekPlan::SlowPath(dataflow_plan) => {
1347                        if let Some(logging_id) = &statement_logging_id {
1348                            self.log_set_transient_index_id(*logging_id, dataflow_plan.id);
1349                        }
1350
1351                        self.call_coordinator(|tx| Command::ExecuteSlowPathPeek {
1352                            dataflow_plan: Box::new(dataflow_plan),
1353                            determination,
1354                            finishing,
1355                            compute_instance: target_cluster_id,
1356                            target_replica,
1357                            intermediate_result_type: typ,
1358                            source_ids,
1359                            conn_id: session.conn_id().clone(),
1360                            max_result_size,
1361                            max_query_result_size,
1362                            watch_set,
1363                            tx,
1364                        })
1365                        .await?
1366                    }
1367                };
1368
1369                // Add timestamp notice if emit_timestamp_notice is enabled
1370                if let Some(determination) = determination_for_notice {
1371                    let explanation = self
1372                        .call_coordinator(|tx| Command::ExplainTimestamp {
1373                            conn_id: session.conn_id().clone(),
1374                            session_wall_time: session.pcx().wall_time,
1375                            cluster_id: target_cluster_id,
1376                            id_bundle: input_id_bundle.clone(),
1377                            determination,
1378                            tx,
1379                        })
1380                        .await;
1381                    session.add_notice(AdapterNotice::QueryTimestamp { explanation });
1382                }
1383
1384                Ok(Some(match copy_to {
1385                    None => response,
1386                    // COPY TO STDOUT
1387                    Some(format) => ExecuteResponse::CopyTo {
1388                        format,
1389                        resp: Box::new(response),
1390                    },
1391                }))
1392            }
1393            Execution::Subscribe {
1394                subscribe_plan,
1395                df_desc,
1396                df_meta,
1397                optimization_finished_at: _optimization_finished_at,
1398            } => {
1399                if df_desc.as_of.as_ref().expect("as of set") == &df_desc.until {
1400                    session.add_notice(AdapterNotice::EqualSubscribeBounds {
1401                        bound: *df_desc.until.as_option().expect("as of set"),
1402                    });
1403                }
1404                coord::sequencer::emit_optimizer_notices(
1405                    &*catalog,
1406                    session,
1407                    &df_meta.optimizer_notices,
1408                );
1409
1410                let response = self
1411                    .call_coordinator(|tx| Command::ExecuteSubscribe {
1412                        df_desc,
1413                        dependency_ids: subscribe_plan.from.depends_on(),
1414                        cluster_id: target_cluster_id,
1415                        replica_id: target_replica,
1416                        conn_id: session.conn_id().clone(),
1417                        session_uuid: session.uuid(),
1418                        read_holds,
1419                        plan: subscribe_plan,
1420                        statement_logging_id,
1421                        tx,
1422                    })
1423                    .await?;
1424                Ok(Some(response))
1425            }
1426            Execution::CopyToS3 {
1427                global_lir_plan,
1428                source_ids,
1429            } => {
1430                let (df_desc, df_meta) = global_lir_plan.unapply();
1431
1432                coord::sequencer::emit_optimizer_notices(
1433                    &*catalog,
1434                    session,
1435                    &df_meta.optimizer_notices,
1436                );
1437
1438                // Extract S3 sink connection info for preflight check
1439                let sink_id = df_desc.sink_id();
1440                let sinks = &df_desc.sink_exports;
1441                if sinks.len() != 1 {
1442                    return Err(AdapterError::Internal(
1443                        "expected exactly one copy to s3 sink".into(),
1444                    ));
1445                }
1446                let (_, sink_desc) = sinks
1447                    .first_key_value()
1448                    .expect("known to be exactly one copy to s3 sink");
1449                let s3_sink_connection = match &sink_desc.connection {
1450                    mz_compute_types::sinks::ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1451                        conn.clone()
1452                    }
1453                    _ => {
1454                        return Err(AdapterError::Internal(
1455                            "expected copy to s3 oneshot sink".into(),
1456                        ));
1457                    }
1458                };
1459
1460                // Perform S3 preflight check in background task (via coordinator).
1461                // This runs slow S3 operations without blocking the coordinator's main task.
1462                self.call_coordinator(|tx| Command::CopyToPreflight {
1463                    s3_sink_connection,
1464                    sink_id,
1465                    tx,
1466                })
1467                .await?;
1468
1469                // Preflight succeeded, now execute the actual COPY TO dataflow
1470                let watch_set = statement_logging_id.map(|logging_id| {
1471                    WatchSetCreation::new(
1472                        logging_id,
1473                        catalog.state(),
1474                        &input_id_bundle,
1475                        determination.timestamp_context.timestamp_or_default(),
1476                    )
1477                });
1478
1479                let response = self
1480                    .call_coordinator(|tx| Command::ExecuteCopyTo {
1481                        df_desc: Box::new(df_desc),
1482                        compute_instance: target_cluster_id,
1483                        target_replica,
1484                        source_ids,
1485                        conn_id: session.conn_id().clone(),
1486                        watch_set,
1487                        tx,
1488                    })
1489                    .await?;
1490
1491                Ok(Some(response))
1492            }
1493        }
1494    }
1495
1496    /// (Similar to Coordinator::determine_timestamp)
1497    /// Determines the timestamp for a query, acquires read holds that ensure the
1498    /// query remains executable at that time, and returns those.
1499    /// The caller is responsible for eventually dropping those read holds.
1500    ///
1501    /// Note: self is taken &mut because of the lazy fetching in `get_compute_instance_client`.
1502    pub(crate) async fn frontend_determine_timestamp(
1503        &mut self,
1504        session: &Session,
1505        id_bundle: &CollectionIdBundle,
1506        when: &QueryWhen,
1507        compute_instance: ComputeInstanceId,
1508        timeline_context: &TimelineContext,
1509        oracle_read_ts: Option<Timestamp>,
1510        real_time_recency_ts: Option<Timestamp>,
1511    ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> {
1512        // this is copy-pasted from Coordinator
1513
1514        let isolation_level = session.vars().transaction_isolation();
1515
1516        let (read_holds, upper) = self
1517            .acquire_read_holds_and_least_valid_write(id_bundle)
1518            .await
1519            .map_err(|err| {
1520                AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1521                    err,
1522                    compute_instance,
1523                )
1524            })?;
1525        let (det, read_holds) = <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1526            session,
1527            id_bundle,
1528            when,
1529            timeline_context,
1530            oracle_read_ts,
1531            real_time_recency_ts,
1532            isolation_level,
1533            read_holds,
1534            upper.clone(),
1535        )?;
1536
1537        session
1538            .metrics()
1539            .determine_timestamp(&[
1540                match det.respond_immediately() {
1541                    true => "true",
1542                    false => "false",
1543                },
1544                isolation_level.as_variant_str(),
1545                &compute_instance.to_string(),
1546            ])
1547            .inc();
1548        if !det.respond_immediately()
1549            && isolation_level == &IsolationLevel::StrictSerializable
1550            && real_time_recency_ts.is_none()
1551        {
1552            // Note down the difference between StrictSerializable and Serializable into a metric.
1553            if let Some(strict) = det.timestamp_context.timestamp() {
1554                let (serializable_det, _tmp_read_holds) =
1555                    <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1556                        session,
1557                        id_bundle,
1558                        when,
1559                        timeline_context,
1560                        oracle_read_ts,
1561                        real_time_recency_ts,
1562                        &IsolationLevel::Serializable,
1563                        read_holds.clone(),
1564                        upper.clone(),
1565                    )?;
1566                if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
1567                    session
1568                        .metrics()
1569                        .timestamp_difference_for_strict_serializable_ms(&[compute_instance
1570                            .to_string()
1571                            .as_ref()])
1572                        .observe(f64::cast_lossy(u64::from(
1573                            strict.saturating_sub(*serializable),
1574                        )));
1575                }
1576            }
1577        }
1578        if !det.respond_immediately()
1579            && isolation_level.is_bounded_staleness()
1580            && real_time_recency_ts.is_none()
1581        {
1582            // Note down the difference between BoundedStaleness and Serializable into a metric.
1583            if let Some(bs_ts) = det.timestamp_context.timestamp() {
1584                let (serializable_det, _tmp_read_holds) =
1585                    <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1586                        session,
1587                        id_bundle,
1588                        when,
1589                        timeline_context,
1590                        oracle_read_ts,
1591                        real_time_recency_ts,
1592                        &IsolationLevel::Serializable,
1593                        read_holds.clone(),
1594                        upper,
1595                    )?;
1596                if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
1597                    session
1598                        .metrics()
1599                        .timestamp_difference_for_bounded_staleness_ms(&[compute_instance
1600                            .to_string()
1601                            .as_ref()])
1602                        .observe(f64::cast_lossy(u64::from(
1603                            serializable.saturating_sub(*bs_ts),
1604                        )));
1605                }
1606            }
1607        }
1608
1609        Ok((det, read_holds))
1610    }
1611
1612    fn assert_read_holds_correct(
1613        read_holds: &ReadHolds,
1614        execution: &Execution,
1615        determination: &TimestampDetermination,
1616        target_cluster_id: ClusterId,
1617        in_immediate_multi_stmt_txn: bool,
1618    ) {
1619        // Extract source_imports, index_imports, as_of, and execution_name based on Execution variant
1620        let (source_imports, index_imports, as_of, execution_name): (
1621            Vec<GlobalId>,
1622            Vec<GlobalId>,
1623            Timestamp,
1624            &str,
1625        ) = match execution {
1626            Execution::Peek {
1627                global_lir_plan, ..
1628            } => match global_lir_plan.peek_plan() {
1629                PeekPlan::FastPath(fast_path_plan) => {
1630                    let (sources, indexes) = match fast_path_plan {
1631                        FastPathPlan::Constant(..) => (vec![], vec![]),
1632                        FastPathPlan::PeekExisting(_coll_id, idx_id, ..) => (vec![], vec![*idx_id]),
1633                        FastPathPlan::PeekPersist(global_id, ..) => (vec![*global_id], vec![]),
1634                    };
1635                    (
1636                        sources,
1637                        indexes,
1638                        determination.timestamp_context.timestamp_or_default(),
1639                        "FastPath",
1640                    )
1641                }
1642                PeekPlan::SlowPath(dataflow_plan) => {
1643                    let as_of = dataflow_plan
1644                        .desc
1645                        .as_of
1646                        .clone()
1647                        .expect("dataflow has an as_of")
1648                        .into_element();
1649                    (
1650                        dataflow_plan.desc.source_imports.keys().cloned().collect(),
1651                        dataflow_plan.desc.index_imports.keys().cloned().collect(),
1652                        as_of,
1653                        "SlowPath",
1654                    )
1655                }
1656            },
1657            Execution::CopyToS3 {
1658                global_lir_plan, ..
1659            } => {
1660                let df_desc = global_lir_plan.df_desc();
1661                let as_of = df_desc
1662                    .as_of
1663                    .clone()
1664                    .expect("dataflow has an as_of")
1665                    .into_element();
1666                (
1667                    df_desc.source_imports.keys().cloned().collect(),
1668                    df_desc.index_imports.keys().cloned().collect(),
1669                    as_of,
1670                    "CopyToS3",
1671                )
1672            }
1673            Execution::ExplainPlan { .. } | Execution::ExplainPushdown { .. } => {
1674                // No read holds assertions needed for EXPLAIN variants
1675                return;
1676            }
1677            Execution::Subscribe { df_desc, .. } => {
1678                let as_of = df_desc
1679                    .as_of
1680                    .clone()
1681                    .expect("dataflow has an as_of")
1682                    .into_element();
1683                (
1684                    df_desc.source_imports.keys().cloned().collect(),
1685                    df_desc.index_imports.keys().cloned().collect(),
1686                    as_of,
1687                    "Subscribe",
1688                )
1689            }
1690        };
1691
1692        // Assert that we have some read holds for all the imports of the dataflow.
1693        for id in source_imports.iter() {
1694            soft_assert_or_log!(
1695                read_holds.storage_holds.contains_key(id),
1696                "[{}] missing read hold for the source import {}; (in_immediate_multi_stmt_txn: {})",
1697                execution_name,
1698                id,
1699                in_immediate_multi_stmt_txn,
1700            );
1701        }
1702        for id in index_imports.iter() {
1703            soft_assert_or_log!(
1704                read_holds
1705                    .compute_ids()
1706                    .map(|(_instance, coll)| coll)
1707                    .contains(id),
1708                "[{}] missing read hold for the index import {}; (in_immediate_multi_stmt_txn: {})",
1709                execution_name,
1710                id,
1711                in_immediate_multi_stmt_txn,
1712            );
1713        }
1714
1715        // Also check the holds against the as_of.
1716        for (id, h) in read_holds.storage_holds.iter() {
1717            soft_assert_or_log!(
1718                h.since().less_equal(&as_of),
1719                "[{}] storage read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1720                execution_name,
1721                h.since(),
1722                id,
1723                as_of,
1724                determination,
1725                in_immediate_multi_stmt_txn,
1726            );
1727        }
1728        for ((instance, id), h) in read_holds.compute_holds.iter() {
1729            soft_assert_eq_or_log!(
1730                *instance,
1731                target_cluster_id,
1732                "[{}] the read hold on {} is on the wrong cluster; (in_immediate_multi_stmt_txn: {})",
1733                execution_name,
1734                id,
1735                in_immediate_multi_stmt_txn,
1736            );
1737            soft_assert_or_log!(
1738                h.since().less_equal(&as_of),
1739                "[{}] compute read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1740                execution_name,
1741                h.since(),
1742                id,
1743                as_of,
1744                determination,
1745                in_immediate_multi_stmt_txn,
1746            );
1747        }
1748    }
1749}
1750
1751/// Enum for branching among various execution steps after optimization
1752enum Execution {
1753    Peek {
1754        global_lir_plan: optimize::peek::GlobalLirPlan,
1755        optimization_finished_at: EpochMillis,
1756        plan_insights_optimizer_trace: Option<OptimizerTrace>,
1757        finishing: RowSetFinishing,
1758        copy_to: Option<plan::CopyFormat>,
1759        insights_ctx: Option<Box<PlanInsightsContext>>,
1760    },
1761    Subscribe {
1762        subscribe_plan: SubscribePlan,
1763        df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1764        df_meta: DataflowMetainfo,
1765        optimization_finished_at: EpochMillis,
1766    },
1767    CopyToS3 {
1768        global_lir_plan: optimize::copy_to::GlobalLirPlan,
1769        source_ids: BTreeSet<GlobalId>,
1770    },
1771    ExplainPlan {
1772        df_meta: DataflowMetainfo,
1773        explain_ctx: ExplainPlanContext,
1774        optimizer: optimize::peek::Optimizer,
1775        insights_ctx: Option<Box<PlanInsightsContext>>,
1776    },
1777    ExplainPushdown {
1778        imports: BTreeMap<GlobalId, mz_expr::MapFilterProject>,
1779        determination: TimestampDetermination,
1780    },
1781}