Skip to main content

mz_adapter/
frontend_peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::sync::Arc;
13use std::time::Duration;
14
15use itertools::Itertools;
16use mz_adapter_types::dyncfgs::ENABLE_FRONTEND_SUBSCRIBES;
17use mz_compute_types::ComputeInstanceId;
18use mz_compute_types::dataflows::DataflowDescription;
19use mz_controller_types::ClusterId;
20use mz_expr::{CollectionPlan, ResultSpec, RowSetFinishing};
21use mz_ore::cast::{CastFrom, CastLossy};
22use mz_ore::collections::CollectionExt;
23use mz_ore::now::EpochMillis;
24use mz_ore::task::JoinHandle;
25use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
26use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
27use mz_repr::role_id::RoleId;
28use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
29use mz_sql::ast::Raw;
30use mz_sql::catalog::CatalogCluster;
31use mz_sql::plan::Params;
32use mz_sql::plan::{
33    self, Explainee, ExplaineeStatement, Plan, QueryWhen, SelectPlan, SubscribePlan,
34};
35use mz_sql::rbac;
36use mz_sql::session::metadata::SessionMetadata;
37use mz_sql::session::vars::IsolationLevel;
38use mz_sql_parser::ast::{CopyDirection, ExplainStage, ShowStatement, Statement};
39use mz_transform::EmptyStatisticsOracle;
40use mz_transform::dataflow::DataflowMetainfo;
41use opentelemetry::trace::TraceContextExt;
42use timely::progress::Antichain;
43use tracing::{Span, debug, warn};
44use tracing_opentelemetry::OpenTelemetrySpanExt;
45
46use crate::catalog::Catalog;
47use crate::command::Command;
48use crate::coord::peek::{FastPathPlan, PeekPlan};
49use crate::coord::sequencer::{eval_copy_to_uri, statistics_oracle};
50use crate::coord::timeline::timedomain_for;
51use crate::coord::timestamp_selection::TimestampDetermination;
52use crate::coord::{
53    Coordinator, CopyToContext, ExecuteContextGuard, ExplainContext, ExplainPlanContext,
54    TargetCluster,
55};
56use crate::explain::insights::PlanInsightsContext;
57use crate::explain::optimizer_trace::OptimizerTrace;
58use crate::optimize::Optimize;
59use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
60use crate::session::{Session, TransactionOps, TransactionStatus};
61use crate::statement_logging::WatchSetCreation;
62use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
63use crate::{
64    AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, PeekClient, ReadHolds,
65    TimelineContext, TimestampContext, TimestampProvider, optimize,
66};
67use crate::{coord, metrics};
68
69impl PeekClient {
70    /// Attempt to sequence a peek from the session task.
71    ///
72    /// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the
73    /// Coordinator's sequencing. If it returns an error, it should be returned to the user.
74    ///
75    /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
76    /// triggering the execution of the underlying query.
77    pub(crate) async fn try_frontend_peek(
78        &mut self,
79        portal_name: &str,
80        catalog: Option<Arc<Catalog>>,
81        session: &mut Session,
82        outer_ctx_extra: &mut Option<ExecuteContextGuard>,
83    ) -> Result<Option<ExecuteResponse>, AdapterError> {
84        // # From handle_execute
85
86        if session.vars().emit_trace_id_notice() {
87            let span_context = tracing::Span::current()
88                .context()
89                .span()
90                .span_context()
91                .clone();
92            if span_context.is_valid() {
93                session.add_notice(AdapterNotice::QueryTrace {
94                    trace_id: span_context.trace_id(),
95                });
96            }
97        }
98
99        // TODO(peek-seq): This snapshot is wasted when we end up bailing out from the frontend peek
100        // sequencing. We could solve this is with that optimization where we
101        // continuously keep a catalog snapshot in the session, and only get a new one when the
102        // catalog revision has changed, which we could see with an atomic read.
103        // But anyhow, this problem will just go away when we reach the point that we never fall
104        // back to the old sequencing.
105        let catalog = match catalog {
106            Some(c) => c,
107            None => self.catalog_snapshot("try_frontend_peek").await,
108        };
109
110        // Extract things from the portal.
111        let (stmt, params, logging, lifecycle_timestamps) = {
112            if let Err(err) = Coordinator::verify_portal(&*catalog, session, portal_name) {
113                outer_ctx_extra
114                    .take()
115                    .and_then(|guard| guard.defuse().retire());
116                return Err(err);
117            }
118            let portal = session
119                .get_portal_unverified(portal_name)
120                // The portal is a session-level thing, so it couldn't have concurrently disappeared
121                // since the above verification.
122                .expect("called verify_portal above");
123            let params = portal.parameters.clone();
124            let stmt = portal.stmt.clone();
125            let logging = Arc::clone(&portal.logging);
126            let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
127            (stmt, params, logging, lifecycle_timestamps)
128        };
129
130        // Before planning, check if this is a statement type we can handle.
131        // This must happen BEFORE statement logging setup to avoid orphaned execution records.
132        if let Some(ref stmt) = stmt {
133            match &**stmt {
134                Statement::Select(_)
135                | Statement::ExplainAnalyzeObject(_)
136                | Statement::ExplainAnalyzeCluster(_)
137                | Statement::Show(ShowStatement::ShowObjects(_))
138                | Statement::Show(ShowStatement::ShowColumns(_)) => {
139                    // These are always fine, just continue.
140                    // Note: EXPLAIN ANALYZE will `plan` to `Plan::Select`.
141                    // Note: ShowObjects plans to `Plan::Select`, ShowColumns plans to `Plan::ShowColumns`.
142                    // We handle `Plan::ShowColumns` specially in `try_frontend_peek_inner`.
143                }
144                Statement::ExplainPlan(explain_stmt) => {
145                    // Only handle ExplainPlan for SELECT statements.
146                    // We don't want to handle e.g. EXPLAIN CREATE MATERIALIZED VIEW here, because that
147                    // requires purification before planning, which the frontend peek sequencing doesn't
148                    // do.
149                    match &explain_stmt.explainee {
150                        mz_sql_parser::ast::Explainee::Select(..) => {
151                            // This is a SELECT, continue
152                        }
153                        _ => {
154                            debug!(
155                                "Bailing out from try_frontend_peek, because EXPLAIN is not for a SELECT query"
156                            );
157                            return Ok(None);
158                        }
159                    }
160                }
161                Statement::ExplainPushdown(explain_stmt) => {
162                    // Only handle EXPLAIN FILTER PUSHDOWN for non-BROKEN SELECT statements
163                    match &explain_stmt.explainee {
164                        mz_sql_parser::ast::Explainee::Select(_, false) => {}
165                        _ => {
166                            debug!(
167                                "Bailing out from try_frontend_peek, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is for EXPLAIN BROKEN"
168                            );
169                            return Ok(None);
170                        }
171                    }
172                }
173                Statement::Copy(copy_stmt) => {
174                    match &copy_stmt.direction {
175                        CopyDirection::To => {
176                            // This is COPY TO (...), continue
177                        }
178                        CopyDirection::From => {
179                            debug!(
180                                "Bailing out from try_frontend_peek, because COPY FROM is not supported"
181                            );
182                            return Ok(None);
183                        }
184                    }
185                }
186
187                Statement::Subscribe(_)
188                    if ENABLE_FRONTEND_SUBSCRIBES.get(catalog.system_config().dyncfgs()) =>
189                {
190                    // We have a subscribe statement to process; continue.
191                }
192                _ => {
193                    debug!(
194                        "Bailing out from try_frontend_peek, because statement type is not supported"
195                    );
196                    return Ok(None);
197                }
198            }
199        }
200
201        // Set up statement logging, and log the beginning of execution.
202        // (But only if we're not executing in the context of another statement.)
203        let logging_guard = self.begin_statement_logging(
204            session,
205            &params,
206            &logging,
207            &catalog,
208            lifecycle_timestamps,
209            outer_ctx_extra,
210        );
211        let statement_logging_id = logging_guard.id();
212        // Streaming peek/subscribe/slow-path/copy-to responses hand off the
213        // end-execution event to the coordinator (via
214        // `handle_peek_notification` / `cancel_pending_peeks`), so letting
215        // the RAII guard's `Drop` also emit `Aborted` on mid-flight drop
216        // would double-end and panic at `end_statement_execution`. Defuse
217        // and manage the lifecycle explicitly below: streaming arms skip
218        // emitting; non-streaming arms emit via `log_ended_execution`.
219        logging_guard.defuse();
220
221        let result = self
222            .try_frontend_peek_inner(session, catalog, stmt, params, statement_logging_id)
223            .await;
224
225        // Log the end of execution if we are logging this statement and
226        // execution has already ended.
227        if let Some(logging_id) = statement_logging_id {
228            let reason = match &result {
229                // Streaming results are handled asynchronously by the
230                // coordinator — it will log the end via
231                // `handle_peek_notification`, so skip emitting here.
232                Ok(Some(
233                    ExecuteResponse::SendingRowsStreaming { .. }
234                    | ExecuteResponse::Subscribing { .. },
235                )) => {
236                    return result;
237                }
238                // COPY TO wrapping a streaming response: same handoff.
239                Ok(Some(resp @ ExecuteResponse::CopyTo { resp: inner, .. })) => {
240                    match inner.as_ref() {
241                        ExecuteResponse::SendingRowsStreaming { .. }
242                        | ExecuteResponse::Subscribing { .. } => {
243                            return result;
244                        }
245                        // For non-streaming COPY TO responses, use the outer
246                        // CopyTo for conversion.
247                        _ => resp.into(),
248                    }
249                }
250                // Bailout case, which should not happen.
251                Ok(None) => {
252                    soft_panic_or_log!(
253                        "Bailed out from `try_frontend_peek_inner` after we already logged the beginning of statement execution."
254                    );
255                    // The old peek sequencing would start its own statement
256                    // logging from scratch; close out this one as errored.
257                    self.log_ended_execution(
258                        logging_id,
259                        StatementEndedExecutionReason::Errored {
260                            error: "Internal error: bailed out from `try_frontend_peek_inner`"
261                                .to_string(),
262                        },
263                    );
264                    return result;
265                }
266                // All other success responses — use the `From` implementation.
267                // TODO(peek-seq): After we delete the old peek sequencing, we
268                // can adjust the `From` impl to do exactly what we need here,
269                // so the special cases above won't be needed.
270                Ok(Some(resp)) => resp.into(),
271                Err(e) => StatementEndedExecutionReason::Errored {
272                    error: e.to_string(),
273                },
274            };
275
276            self.log_ended_execution(logging_id, reason);
277        }
278
279        result
280    }
281
282    /// This is encapsulated in an inner function so that the outer function can still do statement
283    /// logging after the `?` returns of the inner function.
284    async fn try_frontend_peek_inner(
285        &mut self,
286        session: &mut Session,
287        catalog: Arc<Catalog>,
288        stmt: Option<Arc<Statement<Raw>>>,
289        params: Params,
290        statement_logging_id: Option<crate::statement_logging::StatementLoggingId>,
291    ) -> Result<Option<ExecuteResponse>, AdapterError> {
292        let stmt = match stmt {
293            Some(stmt) => stmt,
294            None => {
295                debug!("try_frontend_peek_inner succeeded on an empty query");
296                return Ok(Some(ExecuteResponse::EmptyQuery));
297            }
298        };
299
300        session
301            .metrics()
302            .query_total(&[
303                metrics::session_type_label_value(session.user()),
304                metrics::statement_type_label_value(&stmt),
305            ])
306            .inc();
307
308        // # From handle_execute_inner
309
310        let conn_catalog = catalog.for_session(session);
311        // (`resolved_ids` should be derivable from `stmt`. If `stmt` is later transformed to
312        // remove/add IDs, then `resolved_ids` should be updated to also remove/add those IDs.)
313        let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?;
314
315        let pcx = session.pcx();
316        let (plan, sql_impl_ids) =
317            mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, &params, &resolved_ids)?;
318
319        /// What do we do with the result of the select?
320        enum QueryPlan<'a> {
321            Select(&'a SelectPlan),
322            CopyTo(&'a SelectPlan, CopyToContext),
323            Subscribe(&'a SubscribePlan),
324        }
325
326        let (query_plan, explain_ctx) = match &plan {
327            Plan::Select(select_plan) => {
328                let explain_ctx = if session.vars().emit_plan_insights_notice() {
329                    let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
330                    ExplainContext::PlanInsightsNotice(optimizer_trace)
331                } else {
332                    ExplainContext::None
333                };
334                (QueryPlan::Select(select_plan), explain_ctx)
335            }
336            Plan::ShowColumns(show_columns_plan) => {
337                // ShowColumns wraps a SelectPlan, extract it and proceed as normal.
338                (
339                    QueryPlan::Select(&show_columns_plan.select_plan),
340                    ExplainContext::None,
341                )
342            }
343            Plan::ExplainPlan(plan::ExplainPlanPlan {
344                stage,
345                format,
346                config,
347                explainee: Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc }),
348            }) => {
349                // Create OptimizerTrace to collect optimizer plans
350                let optimizer_trace = OptimizerTrace::new(stage.paths());
351                let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
352                    broken: *broken,
353                    config: config.clone(),
354                    format: *format,
355                    stage: *stage,
356                    replan: None,
357                    desc: Some(desc.clone()),
358                    optimizer_trace,
359                });
360                (QueryPlan::Select(plan), explain_ctx)
361            }
362            // COPY TO S3
363            Plan::CopyTo(plan::CopyToPlan {
364                select_plan,
365                desc,
366                to,
367                connection,
368                connection_id,
369                format,
370                max_file_size,
371            }) => {
372                let uri = eval_copy_to_uri(to.clone(), session, catalog.state())?;
373
374                // (output_batch_count will be set later)
375                let copy_to_ctx = CopyToContext {
376                    desc: desc.clone(),
377                    uri,
378                    connection: connection.clone(),
379                    connection_id: *connection_id,
380                    format: format.clone(),
381                    max_file_size: *max_file_size,
382                    output_batch_count: None,
383                };
384
385                (
386                    QueryPlan::CopyTo(select_plan, copy_to_ctx),
387                    ExplainContext::None,
388                )
389            }
390            Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => {
391                // Only handle EXPLAIN FILTER PUSHDOWN for SELECT statements
392                match explainee {
393                    plan::Explainee::Statement(plan::ExplaineeStatement::Select {
394                        broken: false,
395                        plan,
396                        desc: _,
397                    }) => {
398                        let explain_ctx = ExplainContext::Pushdown;
399                        (QueryPlan::Select(plan), explain_ctx)
400                    }
401                    _ => {
402                        // This shouldn't happen because we already checked for this at the AST
403                        // level before calling `try_frontend_peek_inner`.
404                        soft_panic_or_log!(
405                            "unexpected EXPLAIN FILTER PUSHDOWN plan kind in frontend peek sequencing: {:?}",
406                            explainee
407                        );
408                        debug!(
409                            "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is EXPLAIN BROKEN"
410                        );
411                        return Ok(None);
412                    }
413                }
414            }
415            Plan::SideEffectingFunc(sef_plan) => {
416                // Side-effecting functions need Coordinator state (e.g., active_conns),
417                // so delegate to the Coordinator via a Command.
418                // The RBAC check is performed in the Coordinator where active_conns is available.
419                let response = self
420                    .call_coordinator(|tx| Command::ExecuteSideEffectingFunc {
421                        plan: sef_plan.clone(),
422                        conn_id: session.conn_id().clone(),
423                        current_role: session.role_metadata().current_role,
424                        tx,
425                    })
426                    .await?;
427                return Ok(Some(response));
428            }
429            Plan::Subscribe(subscribe) => (QueryPlan::Subscribe(subscribe), ExplainContext::None),
430            _ => {
431                // This shouldn't happen because we already checked for this at the AST
432                // level before calling `try_frontend_peek_inner`.
433                soft_panic_or_log!(
434                    "Unexpected plan kind in frontend peek sequencing: {:?}",
435                    plan
436                );
437                debug!(
438                    "Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, side-effecting SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO S3"
439                );
440                return Ok(None);
441            }
442        };
443
444        let when = match query_plan {
445            QueryPlan::Select(s) => &s.when,
446            QueryPlan::CopyTo(s, _) => &s.when,
447            QueryPlan::Subscribe(s) => &s.when,
448        };
449
450        let depends_on = match query_plan {
451            QueryPlan::Select(s) => s.source.depends_on(),
452            QueryPlan::CopyTo(s, _) => s.source.depends_on(),
453            QueryPlan::Subscribe(s) => s.from.depends_on(),
454        };
455
456        let contains_temporal = match query_plan {
457            QueryPlan::Select(s) => s.source.contains_temporal(),
458            QueryPlan::CopyTo(s, _) => s.source.contains_temporal(),
459            QueryPlan::Subscribe(s) => Ok(s.from.contains_temporal()),
460        };
461
462        // # From sequence_plan
463
464        // We have checked the plan kind above.
465        assert!(plan.allowed_in_read_only());
466
467        let (cluster, target_cluster_id, target_cluster_name) = {
468            let target_cluster = match session.transaction().cluster() {
469                // Use the current transaction's cluster.
470                Some(cluster_id) => TargetCluster::Transaction(cluster_id),
471                // If there isn't a current cluster set for a transaction, then try to auto route.
472                None => coord::catalog_serving::auto_run_on_catalog_server(
473                    &conn_catalog,
474                    session,
475                    &plan,
476                ),
477            };
478            let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
479            (cluster, cluster.id, &cluster.name)
480        };
481
482        // Log cluster selection
483        if let Some(logging_id) = &statement_logging_id {
484            self.log_set_cluster(*logging_id, target_cluster_id);
485        }
486
487        coord::catalog_serving::check_cluster_restrictions(
488            target_cluster_name.as_str(),
489            &conn_catalog,
490            &plan,
491        )?;
492
493        rbac::check_plan(
494            &conn_catalog,
495            // We can't look at `active_conns` here, but that's ok, because this case was handled
496            // above already inside `Command::ExecuteSideEffectingFunc`.
497            None::<fn(u32) -> Option<RoleId>>,
498            session,
499            &plan,
500            Some(target_cluster_id),
501            &resolved_ids,
502            &sql_impl_ids,
503        )?;
504
505        if let Some((_, wait_future)) =
506            coord::appends::waiting_on_startup_appends(&*catalog, session, &plan)
507        {
508            wait_future.await;
509        }
510
511        let max_query_result_size = Some(session.vars().max_query_result_size());
512
513        // # From sequence_peek
514
515        // # From peek_validate
516
517        let compute_instance_snapshot =
518            ComputeInstanceSnapshot::new_without_collections(cluster.id());
519
520        let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config())
521            .override_from(&catalog.get_cluster(cluster.id()).config.features())
522            .override_from(&explain_ctx);
523
524        if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
525            return Err(AdapterError::NoClusterReplicasAvailable {
526                name: cluster.name.clone(),
527                is_managed: cluster.is_managed(),
528            });
529        }
530
531        let (_, view_id) = self.transient_id_gen.allocate_id();
532        let (_, index_id) = self.transient_id_gen.allocate_id();
533
534        let target_replica_name = session.vars().cluster_replica();
535        let mut target_replica = target_replica_name
536            .map(|name| {
537                cluster
538                    .replica_id(name)
539                    .ok_or(AdapterError::UnknownClusterReplica {
540                        cluster_name: cluster.name.clone(),
541                        replica_name: name.to_string(),
542                    })
543            })
544            .transpose()?;
545
546        let source_ids = depends_on;
547        // TODO(peek-seq): validate_timeline_context can be expensive in real scenarios (not in
548        // simple benchmarks), because it traverses transitive dependencies even of indexed views and
549        // materialized views (also traversing their MIR plans).
550        let mut timeline_context = catalog.validate_timeline_context(source_ids.iter().copied())?;
551        if matches!(timeline_context, TimelineContext::TimestampIndependent) && contains_temporal? {
552            // If the source IDs are timestamp independent but the query contains temporal functions,
553            // then the timeline context needs to be upgraded to timestamp dependent. This is
554            // required because `source_ids` doesn't contain functions.
555            timeline_context = TimelineContext::TimestampDependent;
556        }
557
558        let notices = coord::sequencer::check_log_reads(
559            &catalog,
560            cluster,
561            &source_ids,
562            &mut target_replica,
563            session.vars(),
564        )?;
565        session.add_notices(notices);
566
567        // # From peek_linearize_timestamp
568
569        let isolation_level = session.vars().transaction_isolation().clone();
570        let timeline = Coordinator::get_timeline(&timeline_context);
571        let needs_linearized_read_ts =
572            Coordinator::needs_linearized_read_ts(&isolation_level, when);
573
574        let oracle_read_ts = match timeline {
575            Some(timeline) if needs_linearized_read_ts => {
576                let oracle = self.ensure_oracle(timeline).await?;
577                let oracle_read_ts = oracle.read_ts().await;
578                Some(oracle_read_ts)
579            }
580            Some(_) | None => None,
581        };
582
583        // # From peek_real_time_recency
584
585        let vars = session.vars();
586        let real_time_recency_ts: Option<Timestamp> = if vars.real_time_recency()
587            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
588            && !session.contains_read_timestamp()
589        {
590            // Only call the coordinator when we actually need real-time recency
591            self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp {
592                source_ids: source_ids.clone(),
593                real_time_recency_timeout: *vars.real_time_recency_timeout(),
594                tx,
595            })
596            .await?
597        } else {
598            None
599        };
600
601        // # From peek_timestamp_read_hold
602
603        let dataflow_builder =
604            DataflowBuilder::new(catalog.state(), compute_instance_snapshot.clone());
605        let input_id_bundle = dataflow_builder.sufficient_collections(source_ids.clone());
606
607        // ## From sequence_peek_timestamp
608
609        // Warning: This will be false for AS OF queries, even if we are otherwise inside a
610        // multi-statement transaction. (It's also false for FreshestTableWrite, which is currently
611        // only read-then-write queries, which can't be part of multi-statement transactions, so
612        // FreshestTableWrite doesn't matter.)
613        //
614        // TODO(peek-seq): It's not totally clear to me what the intended semantics are for AS OF
615        // queries inside a transaction: We clearly can't use the transaction timestamp, but the old
616        // peek sequencing still does a timedomain validation. The new peek sequencing does not do
617        // timedomain validation for AS OF queries, which seems more natural. But I'm thinking that
618        // it would be the cleanest to just simply disallow AS OF queries inside transactions.
619        let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when)
620            && !matches!(query_plan, QueryPlan::Subscribe { .. });
621
622        // Fetch or generate a timestamp for this query and fetch or acquire read holds.
623        let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
624            // Use the transaction's timestamp if it exists and this isn't an AS OF query.
625            // (`in_immediate_multi_stmt_txn` is false for AS OF queries.)
626            Some(
627                determination @ TimestampDetermination {
628                    timestamp_context: TimestampContext::TimelineTimestamp { .. },
629                    ..
630                },
631            ) if in_immediate_multi_stmt_txn => {
632                // This is a subsequent (non-AS OF, non-constant) query in a multi-statement
633                // transaction. We now:
634                // - Validate that the query only accesses collections within the transaction's
635                //   timedomain (which we know from the stored read holds).
636                // - Use the transaction's stored timestamp determination.
637                // - Use the (relevant subset of the) transaction's read holds.
638
639                let txn_read_holds_opt = self
640                    .call_coordinator(|tx| Command::GetTransactionReadHoldsBundle {
641                        conn_id: session.conn_id().clone(),
642                        tx,
643                    })
644                    .await;
645
646                if let Some(txn_read_holds) = txn_read_holds_opt {
647                    let allowed_id_bundle = txn_read_holds.id_bundle();
648                    let outside = input_id_bundle.difference(&allowed_id_bundle);
649
650                    // Queries without a timestamp and timeline can belong to any existing timedomain.
651                    if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
652                        let valid_names =
653                            allowed_id_bundle.resolve_names(&*catalog, session.conn_id());
654                        let invalid_names = outside.resolve_names(&*catalog, session.conn_id());
655                        return Err(AdapterError::RelationOutsideTimeDomain {
656                            relations: invalid_names,
657                            names: valid_names,
658                        });
659                    }
660
661                    // Extract the subset of read holds for the collections this query accesses.
662                    let read_holds = txn_read_holds.subset(&input_id_bundle);
663
664                    (determination, read_holds)
665                } else {
666                    // This should never happen: we're in a subsequent query of a multi-statement
667                    // transaction (we have a transaction timestamp), but the coordinator has no
668                    // transaction read holds stored. This indicates a bug in the transaction
669                    // handling.
670                    return Err(AdapterError::Internal(
671                        "Missing transaction read holds for multi-statement transaction"
672                            .to_string(),
673                    ));
674                }
675            }
676            _ => {
677                // There is no timestamp determination yet for this transaction. Either:
678                // - We are not in a multi-statement transaction.
679                // - This is the first (non-AS OF) query in a multi-statement transaction.
680                // - This is an AS OF query.
681                // - This is a constant query (`TimestampContext::NoTimestamp`).
682
683                let timedomain_bundle;
684                let determine_bundle = if in_immediate_multi_stmt_txn {
685                    // This is the first (non-AS OF) query in a multi-statement transaction.
686                    // Determine a timestamp that will be valid for anything in any schema
687                    // referenced by the first query.
688                    timedomain_bundle = timedomain_for(
689                        &*catalog,
690                        &dataflow_builder,
691                        &source_ids,
692                        &timeline_context,
693                        session.conn_id(),
694                        target_cluster_id,
695                    )?;
696                    &timedomain_bundle
697                } else {
698                    // Simply use the inputs of the current query.
699                    &input_id_bundle
700                };
701                let (determination, read_holds) = self
702                    .frontend_determine_timestamp(
703                        session,
704                        determine_bundle,
705                        when,
706                        target_cluster_id,
707                        &timeline_context,
708                        oracle_read_ts,
709                        real_time_recency_ts,
710                    )
711                    .await?;
712
713                // If this is the first (non-AS OF) query in a multi-statement transaction, store
714                // the read holds in the coordinator, so subsequent queries can validate against
715                // them.
716                if in_immediate_multi_stmt_txn {
717                    self.call_coordinator(|tx| Command::StoreTransactionReadHolds {
718                        conn_id: session.conn_id().clone(),
719                        read_holds: read_holds.clone(),
720                        tx,
721                    })
722                    .await;
723                }
724
725                (determination, read_holds)
726            }
727        };
728
729        {
730            // Assert that we have a read hold for all the collections in our `input_id_bundle`.
731            for id in input_id_bundle.iter() {
732                let s = read_holds.storage_holds.contains_key(&id);
733                let c = read_holds
734                    .compute_ids()
735                    .map(|(_instance, coll)| coll)
736                    .contains(&id);
737                soft_assert_or_log!(
738                    s || c,
739                    "missing read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
740                    id,
741                    in_immediate_multi_stmt_txn,
742                );
743            }
744
745            // Assert that each part of the `input_id_bundle` corresponds to the right part of
746            // `read_holds`.
747            for id in input_id_bundle.storage_ids.iter() {
748                soft_assert_or_log!(
749                    read_holds.storage_holds.contains_key(id),
750                    "missing storage read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
751                    id,
752                    in_immediate_multi_stmt_txn,
753                );
754            }
755            for id in input_id_bundle
756                .compute_ids
757                .iter()
758                .flat_map(|(_instance, colls)| colls)
759            {
760                soft_assert_or_log!(
761                    read_holds
762                        .compute_ids()
763                        .map(|(_instance, coll)| coll)
764                        .contains(id),
765                    "missing compute read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
766                    id,
767                    in_immediate_multi_stmt_txn,
768                );
769            }
770        }
771
772        // (TODO(peek-seq): The below TODO is copied from the old peek sequencing. We should resolve
773        // this when we decide what to with `AS OF` in transactions.)
774        // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
775        // arbitrary and we don't recall why we did it (possibly an error!). Change this to always
776        // set the transaction ops. Decide and document what our policy should be on AS OF queries.
777        // Maybe they shouldn't be allowed in transactions at all because it's hard to explain
778        // what's going on there. This should probably get a small design document.
779
780        // We only track the peeks in the session if the query doesn't use AS
781        // OF or we're inside an explicit transaction. The latter case is
782        // necessary to support PG's `BEGIN` semantics, whose behavior can
783        // depend on whether or not reads have occurred in the txn.
784        let requires_linearization = (&explain_ctx).into();
785        let mut transaction_determination = determination.clone();
786        match query_plan {
787            QueryPlan::Subscribe { .. } => {
788                if when.is_transactional() {
789                    session.add_transaction_ops(TransactionOps::Subscribe)?;
790                }
791            }
792            QueryPlan::Select(..) | QueryPlan::CopyTo(..) => {
793                if when.is_transactional() {
794                    session.add_transaction_ops(TransactionOps::Peeks {
795                        determination: transaction_determination,
796                        cluster_id: target_cluster_id,
797                        requires_linearization,
798                    })?;
799                } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
800                    // If the query uses AS OF, then ignore the timestamp.
801                    transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
802                    session.add_transaction_ops(TransactionOps::Peeks {
803                        determination: transaction_determination,
804                        cluster_id: target_cluster_id,
805                        requires_linearization,
806                    })?;
807                }
808            }
809        }
810
811        // # From peek_optimize
812
813        let stats = statistics_oracle(
814            session,
815            &source_ids,
816            &determination.timestamp_context.antichain(),
817            true,
818            catalog.system_config(),
819            &*self.storage_collections,
820        )
821        .await
822        .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
823
824        // Generate data structures that can be moved to another task where we will perform possibly
825        // expensive optimizations.
826        let timestamp_context = determination.timestamp_context.clone();
827        let session_meta = session.meta();
828        let now = catalog.config().now.clone();
829        let target_cluster_name = target_cluster_name.clone();
830        let needs_plan_insights = explain_ctx.needs_plan_insights();
831        let determination_for_pushdown = if matches!(explain_ctx, ExplainContext::Pushdown) {
832            // This is a hairy data structure, so avoid this clone if we are not in
833            // EXPLAIN FILTER PUSHDOWN.
834            Some(determination.clone())
835        } else {
836            None
837        };
838
839        let span = Span::current();
840
841        // Prepare data for plan insights if needed
842        let catalog_for_insights = if needs_plan_insights {
843            Some(Arc::clone(&catalog))
844        } else {
845            None
846        };
847        let mut compute_instances = BTreeMap::new();
848        if needs_plan_insights {
849            for user_cluster in catalog.user_clusters() {
850                let snapshot = ComputeInstanceSnapshot::new_without_collections(user_cluster.id);
851                compute_instances.insert(user_cluster.name.clone(), snapshot);
852            }
853        }
854
855        let source_ids_for_closure = source_ids.clone();
856
857        let optimization_future: JoinHandle<Result<_, AdapterError>> = match query_plan {
858            QueryPlan::CopyTo(select_plan, mut copy_to_ctx) => {
859                let raw_expr = select_plan.source.clone();
860
861                // COPY TO path: calculate output_batch_count and create copy_to optimizer
862                let worker_counts = cluster.replicas().map(|r| {
863                    let loc = &r.config.location;
864                    loc.workers().unwrap_or_else(|| loc.num_processes())
865                });
866                let max_worker_count = match worker_counts.max() {
867                    Some(count) => u64::cast_from(count),
868                    None => {
869                        return Err(AdapterError::NoClusterReplicasAvailable {
870                            name: cluster.name.clone(),
871                            is_managed: cluster.is_managed(),
872                        });
873                    }
874                };
875                copy_to_ctx.output_batch_count = Some(max_worker_count);
876
877                let mut optimizer = optimize::copy_to::Optimizer::new(
878                    Arc::clone(&catalog),
879                    compute_instance_snapshot,
880                    view_id,
881                    copy_to_ctx,
882                    optimizer_config,
883                    self.optimizer_metrics.clone(),
884                );
885
886                mz_ore::task::spawn_blocking(
887                    || "optimize copy-to",
888                    move || {
889                        span.in_scope(|| {
890                            let _dispatch_guard = explain_ctx.dispatch_guard();
891
892                            // COPY TO path
893                            // HIR ⇒ MIR lowering and MIR optimization (local)
894                            let local_mir_plan =
895                                optimizer.catch_unwind_optimize(raw_expr.clone())?;
896                            // Attach resolved context required to continue the pipeline.
897                            let local_mir_plan = local_mir_plan.resolve(
898                                timestamp_context.clone(),
899                                &session_meta,
900                                stats,
901                            );
902                            // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
903                            let global_lir_plan =
904                                optimizer.catch_unwind_optimize(local_mir_plan)?;
905                            Ok(Execution::CopyToS3 {
906                                global_lir_plan,
907                                source_ids: source_ids_for_closure,
908                            })
909                        })
910                    },
911                )
912            }
913            QueryPlan::Select(select_plan) => {
914                let select_plan = select_plan.clone();
915                let raw_expr = select_plan.source.clone();
916
917                // SELECT/EXPLAIN path: create peek optimizer
918                let mut optimizer = optimize::peek::Optimizer::new(
919                    Arc::clone(&catalog),
920                    compute_instance_snapshot,
921                    select_plan.finishing.clone(),
922                    view_id,
923                    index_id,
924                    optimizer_config,
925                    self.optimizer_metrics.clone(),
926                );
927
928                mz_ore::task::spawn_blocking(
929                    || "optimize peek",
930                    move || {
931                        span.in_scope(|| {
932                            let _dispatch_guard = explain_ctx.dispatch_guard();
933
934                            // SELECT/EXPLAIN path
935                            // HIR ⇒ MIR lowering and MIR optimization (local)
936
937                            // The purpose of wrapping the following in a closure is to control where the
938                            // `?`s return from, so that even when a `catch_unwind_optimize` call fails,
939                            // we can still handle `EXPLAIN BROKEN`.
940                            let pipeline = || {
941                                let local_mir_plan =
942                                    optimizer.catch_unwind_optimize(raw_expr.clone())?;
943                                // Attach resolved context required to continue the pipeline.
944                                let local_mir_plan = local_mir_plan.resolve(
945                                    timestamp_context.clone(),
946                                    &session_meta,
947                                    stats,
948                                );
949                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
950                                let global_lir_plan =
951                                    optimizer.catch_unwind_optimize(local_mir_plan)?;
952                                Ok::<_, AdapterError>(global_lir_plan)
953                            };
954
955                            let global_lir_plan_result = pipeline();
956                            let optimization_finished_at = now();
957
958                            let create_insights_ctx =
959                                |optimizer: &optimize::peek::Optimizer,
960                                 is_notice: bool|
961                                 -> Option<Box<PlanInsightsContext>> {
962                                    if !needs_plan_insights {
963                                        return None;
964                                    }
965
966                                    let catalog = catalog_for_insights.as_ref()?;
967
968                                    let enable_re_optimize = if needs_plan_insights {
969                                        // Disable any plan insights that use the optimizer if we only want the
970                                        // notice and plan optimization took longer than the threshold. This is
971                                        // to prevent a situation where optimizing takes a while and there are
972                                        // lots of clusters, which would delay peek execution by the product of
973                                        // those.
974                                        //
975                                        // (This heuristic doesn't work well, see #9492.)
976                                        let dyncfgs = catalog.system_config().dyncfgs();
977                                        let opt_limit = mz_adapter_types::dyncfgs
978                                        ::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
979                                            .get(dyncfgs);
980                                        !(is_notice && optimizer.duration() > opt_limit)
981                                    } else {
982                                        false
983                                    };
984
985                                    Some(Box::new(PlanInsightsContext {
986                                        stmt: select_plan
987                                            .select
988                                            .as_deref()
989                                            .map(Clone::clone)
990                                            .map(Statement::Select),
991                                        raw_expr: raw_expr.clone(),
992                                        catalog: Arc::clone(catalog),
993                                        compute_instances,
994                                        target_instance: target_cluster_name,
995                                        metrics: optimizer.metrics().clone(),
996                                        finishing: optimizer.finishing().clone(),
997                                        optimizer_config: optimizer.config().clone(),
998                                        session: session_meta,
999                                        timestamp_context,
1000                                        view_id: optimizer.select_id(),
1001                                        index_id: optimizer.index_id(),
1002                                        enable_re_optimize,
1003                                    }))
1004                                };
1005
1006                            let global_lir_plan = match global_lir_plan_result {
1007                                Ok(plan) => plan,
1008                                Err(err) => {
1009                                    let result = if let ExplainContext::Plan(explain_ctx) =
1010                                        explain_ctx
1011                                        && explain_ctx.broken
1012                                    {
1013                                        // EXPLAIN BROKEN: log error and continue with defaults
1014                                        tracing::error!(
1015                                            "error while handling EXPLAIN statement: {}",
1016                                            err
1017                                        );
1018                                        Ok(Execution::ExplainPlan {
1019                                            df_meta: Default::default(),
1020                                            explain_ctx,
1021                                            optimizer,
1022                                            insights_ctx: None,
1023                                        })
1024                                    } else {
1025                                        Err(err)
1026                                    };
1027                                    return result;
1028                                }
1029                            };
1030
1031                            match explain_ctx {
1032                                ExplainContext::Plan(explain_ctx) => {
1033                                    let (_, df_meta, _) = global_lir_plan.unapply();
1034                                    let insights_ctx = create_insights_ctx(&optimizer, false);
1035                                    Ok(Execution::ExplainPlan {
1036                                        df_meta,
1037                                        explain_ctx,
1038                                        optimizer,
1039                                        insights_ctx,
1040                                    })
1041                                }
1042                                ExplainContext::None => Ok(Execution::Peek {
1043                                    global_lir_plan,
1044                                    optimization_finished_at,
1045                                    plan_insights_optimizer_trace: None,
1046                                    finishing: select_plan.finishing,
1047                                    copy_to: select_plan.copy_to,
1048                                    insights_ctx: None,
1049                                }),
1050                                ExplainContext::PlanInsightsNotice(optimizer_trace) => {
1051                                    let insights_ctx = create_insights_ctx(&optimizer, true);
1052                                    Ok(Execution::Peek {
1053                                        global_lir_plan,
1054                                        optimization_finished_at,
1055                                        plan_insights_optimizer_trace: Some(optimizer_trace),
1056                                        finishing: select_plan.finishing,
1057                                        copy_to: select_plan.copy_to,
1058                                        insights_ctx,
1059                                    })
1060                                }
1061                                ExplainContext::Pushdown => {
1062                                    let (plan, _, _) = global_lir_plan.unapply();
1063                                    let imports = match plan {
1064                                        PeekPlan::SlowPath(plan) => plan
1065                                            .desc
1066                                            .source_imports
1067                                            .into_iter()
1068                                            .filter_map(|(id, import)| {
1069                                                import.desc.arguments.operators.map(|mfp| (id, mfp))
1070                                            })
1071                                            .collect(),
1072                                        PeekPlan::FastPath(_) => {
1073                                            std::collections::BTreeMap::default()
1074                                        }
1075                                    };
1076                                    Ok(Execution::ExplainPushdown {
1077                                        imports,
1078                                        determination: determination_for_pushdown
1079                                            .expect("it's present for the ExplainPushdown case"),
1080                                    })
1081                                }
1082                            }
1083                        })
1084                    },
1085                )
1086            }
1087            QueryPlan::Subscribe(plan) => {
1088                let plan = plan.clone();
1089                let catalog: Arc<Catalog> = Arc::clone(&catalog);
1090                let debug_name = format!("subscribe-{}", index_id);
1091                let mut optimizer = optimize::subscribe::Optimizer::new(
1092                    catalog,
1093                    compute_instance_snapshot.clone(),
1094                    view_id,
1095                    index_id,
1096                    plan.with_snapshot,
1097                    plan.up_to,
1098                    debug_name,
1099                    optimizer_config,
1100                    self.optimizer_metrics.clone(),
1101                );
1102                mz_ore::task::spawn_blocking(
1103                    || "optimize subscribe",
1104                    move || {
1105                        span.in_scope(|| {
1106                            let _dispatch_guard = explain_ctx.dispatch_guard();
1107
1108                            let global_mir_plan = optimizer.catch_unwind_optimize(plan.clone())?;
1109                            let as_of = timestamp_context.timestamp_or_default();
1110
1111                            if let Some(up_to) = optimizer.up_to() {
1112                                if as_of > up_to {
1113                                    return Err(AdapterError::AbsurdSubscribeBounds {
1114                                        as_of,
1115                                        up_to,
1116                                    });
1117                                }
1118                            }
1119                            let local_mir_plan =
1120                                global_mir_plan.resolve(Antichain::from_elem(as_of));
1121
1122                            let global_lir_plan =
1123                                optimizer.catch_unwind_optimize(local_mir_plan)?;
1124                            let optimization_finished_at = now();
1125
1126                            let (df_desc, df_meta) = global_lir_plan.unapply();
1127                            Ok(Execution::Subscribe {
1128                                subscribe_plan: plan,
1129                                df_desc,
1130                                df_meta,
1131                                optimization_finished_at,
1132                            })
1133                        })
1134                    },
1135                )
1136            }
1137        };
1138
1139        let mut optimization_timeout = *session.vars().statement_timeout();
1140        // Timeout of 0 is equivalent to "off", meaning we will wait "forever."
1141        if optimization_timeout == Duration::ZERO {
1142            optimization_timeout = Duration::MAX;
1143        }
1144        let optimization_result =
1145            // Note: spawn_blocking tasks cannot be cancelled, so on timeout we stop waiting but the
1146            // optimization task continues running in the background until completion. See
1147            // https://github.com/MaterializeInc/database-issues/issues/8644 for properly cancelling
1148            // optimizer runs.
1149            match tokio::time::timeout(optimization_timeout, optimization_future).await {
1150                Ok(Ok(result)) => result,
1151                Ok(Err(AdapterError::Optimizer(err))) => {
1152                    return Err(AdapterError::Internal(format!(
1153                        "internal error in optimizer: {}",
1154                        err
1155                    )));
1156                }
1157                Ok(Err(err)) => {
1158                    return Err(err);
1159                }
1160                Err(_elapsed) => {
1161                    warn!("optimize peek timed out after {:?}", optimization_timeout);
1162                    return Err(AdapterError::StatementTimeout);
1163                }
1164            };
1165
1166        // Log optimization finished
1167        if let Some(logging_id) = &statement_logging_id {
1168            self.log_lifecycle_event(*logging_id, StatementLifecycleEvent::OptimizationFinished);
1169        }
1170
1171        // Assert that read holds are correct for the execution plan
1172        Self::assert_read_holds_correct(
1173            &read_holds,
1174            &optimization_result,
1175            &determination,
1176            target_cluster_id,
1177            in_immediate_multi_stmt_txn,
1178        );
1179
1180        // Handle the optimization result: either generate EXPLAIN output or continue with execution
1181        match optimization_result {
1182            Execution::ExplainPlan {
1183                df_meta,
1184                explain_ctx,
1185                optimizer,
1186                insights_ctx,
1187            } => {
1188                let rows = coord::sequencer::explain_plan_inner(
1189                    session,
1190                    &catalog,
1191                    df_meta,
1192                    explain_ctx,
1193                    optimizer,
1194                    insights_ctx,
1195                )
1196                .await?;
1197
1198                Ok(Some(ExecuteResponse::SendingRowsImmediate {
1199                    rows: Box::new(rows.into_row_iter()),
1200                }))
1201            }
1202            Execution::ExplainPushdown {
1203                imports,
1204                determination,
1205            } => {
1206                // # From peek_explain_pushdown
1207
1208                let as_of = determination.timestamp_context.antichain();
1209                let mz_now = determination
1210                    .timestamp_context
1211                    .timestamp()
1212                    .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1213                    .unwrap_or_else(ResultSpec::value_all);
1214
1215                Ok(Some(
1216                    coord::sequencer::explain_pushdown_future_inner(
1217                        session,
1218                        &*catalog,
1219                        &self.storage_collections,
1220                        as_of,
1221                        mz_now,
1222                        imports,
1223                    )
1224                    .await
1225                    .await?,
1226                ))
1227            }
1228            Execution::Peek {
1229                global_lir_plan,
1230                optimization_finished_at: _optimization_finished_at,
1231                plan_insights_optimizer_trace,
1232                finishing,
1233                copy_to,
1234                insights_ctx,
1235            } => {
1236                // Continue with normal execution
1237                // # From peek_finish
1238
1239                // The typ here was generated from the HIR SQL type and simply stored in LIR.
1240                let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
1241
1242                coord::sequencer::emit_optimizer_notices(
1243                    &*catalog,
1244                    session,
1245                    &df_meta.optimizer_notices,
1246                );
1247
1248                // Generate plan insights notice if needed
1249                if let Some(trace) = plan_insights_optimizer_trace {
1250                    let target_cluster = catalog.get_cluster(target_cluster_id);
1251                    let features = OptimizerFeatures::from(catalog.system_config())
1252                        .override_from(&target_cluster.config.features());
1253                    let insights = trace
1254                        .into_plan_insights(
1255                            &features,
1256                            &catalog.for_session(session),
1257                            Some(finishing.clone()),
1258                            Some(target_cluster),
1259                            df_meta.clone(),
1260                            insights_ctx,
1261                        )
1262                        .await?;
1263                    session.add_notice(AdapterNotice::PlanInsights(insights));
1264                }
1265
1266                // # Now back to peek_finish
1267
1268                let watch_set = statement_logging_id.map(|logging_id| {
1269                    WatchSetCreation::new(
1270                        logging_id,
1271                        catalog.state(),
1272                        &input_id_bundle,
1273                        determination.timestamp_context.timestamp_or_default(),
1274                    )
1275                });
1276
1277                let max_result_size = catalog.system_config().max_result_size();
1278
1279                // Clone determination if we need it for emit_timestamp_notice, since it may be
1280                // moved into Command::ExecuteSlowPathPeek.
1281                let determination_for_notice = if session.vars().emit_timestamp_notice() {
1282                    Some(determination.clone())
1283                } else {
1284                    None
1285                };
1286
1287                let response = match peek_plan {
1288                    PeekPlan::FastPath(fast_path_plan) => {
1289                        if let Some(logging_id) = &statement_logging_id {
1290                            // TODO(peek-seq): Actually, we should log it also for
1291                            // FastPathPlan::Constant. The only reason we are not doing so at the
1292                            // moment is to match the old peek sequencing, so that statement logging
1293                            // tests pass with the frontend peek sequencing turned both on and off.
1294                            //
1295                            // When the old sequencing is removed, we should make a couple of
1296                            // changes in how we log timestamps:
1297                            // - Move this up to just after timestamp determination, so that it
1298                            //   appears in the log as soon as possible.
1299                            // - Do it also for Constant peeks.
1300                            // - Currently, slow-path peeks' timestamp logging is done by
1301                            //   `implement_peek_plan`. We could remove it from there, and just do
1302                            //   it here.
1303                            if !matches!(fast_path_plan, FastPathPlan::Constant(..)) {
1304                                self.log_set_timestamp(
1305                                    *logging_id,
1306                                    determination.timestamp_context.timestamp_or_default(),
1307                                );
1308                            }
1309                        }
1310
1311                        let row_set_finishing_seconds =
1312                            session.metrics().row_set_finishing_seconds().clone();
1313
1314                        let peek_stash_read_batch_size_bytes =
1315                            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
1316                                .get(catalog.system_config().dyncfgs());
1317                        let peek_stash_read_memory_budget_bytes =
1318                            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
1319                                .get(catalog.system_config().dyncfgs());
1320
1321                        self.implement_fast_path_peek_plan(
1322                            fast_path_plan,
1323                            determination.timestamp_context.timestamp_or_default(),
1324                            finishing,
1325                            target_cluster_id,
1326                            target_replica,
1327                            typ,
1328                            max_result_size,
1329                            max_query_result_size,
1330                            row_set_finishing_seconds,
1331                            read_holds,
1332                            peek_stash_read_batch_size_bytes,
1333                            peek_stash_read_memory_budget_bytes,
1334                            session.conn_id().clone(),
1335                            source_ids,
1336                            watch_set,
1337                        )
1338                        .await?
1339                    }
1340                    PeekPlan::SlowPath(dataflow_plan) => {
1341                        if let Some(logging_id) = &statement_logging_id {
1342                            self.log_set_transient_index_id(*logging_id, dataflow_plan.id);
1343                        }
1344
1345                        self.call_coordinator(|tx| Command::ExecuteSlowPathPeek {
1346                            dataflow_plan: Box::new(dataflow_plan),
1347                            determination,
1348                            finishing,
1349                            compute_instance: target_cluster_id,
1350                            target_replica,
1351                            intermediate_result_type: typ,
1352                            source_ids,
1353                            conn_id: session.conn_id().clone(),
1354                            max_result_size,
1355                            max_query_result_size,
1356                            watch_set,
1357                            tx,
1358                        })
1359                        .await?
1360                    }
1361                };
1362
1363                // Add timestamp notice if emit_timestamp_notice is enabled
1364                if let Some(determination) = determination_for_notice {
1365                    let explanation = self
1366                        .call_coordinator(|tx| Command::ExplainTimestamp {
1367                            conn_id: session.conn_id().clone(),
1368                            session_wall_time: session.pcx().wall_time,
1369                            cluster_id: target_cluster_id,
1370                            id_bundle: input_id_bundle.clone(),
1371                            determination,
1372                            tx,
1373                        })
1374                        .await;
1375                    session.add_notice(AdapterNotice::QueryTimestamp { explanation });
1376                }
1377
1378                Ok(Some(match copy_to {
1379                    None => response,
1380                    // COPY TO STDOUT
1381                    Some(format) => ExecuteResponse::CopyTo {
1382                        format,
1383                        resp: Box::new(response),
1384                    },
1385                }))
1386            }
1387            Execution::Subscribe {
1388                subscribe_plan,
1389                df_desc,
1390                df_meta,
1391                optimization_finished_at: _optimization_finished_at,
1392            } => {
1393                if df_desc.as_of.as_ref().expect("as of set") == &df_desc.until {
1394                    session.add_notice(AdapterNotice::EqualSubscribeBounds {
1395                        bound: *df_desc.until.as_option().expect("as of set"),
1396                    });
1397                }
1398                coord::sequencer::emit_optimizer_notices(
1399                    &*catalog,
1400                    session,
1401                    &df_meta.optimizer_notices,
1402                );
1403
1404                let response = self
1405                    .call_coordinator(|tx| Command::ExecuteSubscribe {
1406                        df_desc,
1407                        dependency_ids: subscribe_plan.from.depends_on(),
1408                        cluster_id: target_cluster_id,
1409                        replica_id: target_replica,
1410                        conn_id: session.conn_id().clone(),
1411                        session_uuid: session.uuid(),
1412                        read_holds,
1413                        plan: subscribe_plan,
1414                        statement_logging_id,
1415                        tx,
1416                    })
1417                    .await?;
1418                Ok(Some(response))
1419            }
1420            Execution::CopyToS3 {
1421                global_lir_plan,
1422                source_ids,
1423            } => {
1424                let (df_desc, df_meta) = global_lir_plan.unapply();
1425
1426                coord::sequencer::emit_optimizer_notices(
1427                    &*catalog,
1428                    session,
1429                    &df_meta.optimizer_notices,
1430                );
1431
1432                // Extract S3 sink connection info for preflight check
1433                let sink_id = df_desc.sink_id();
1434                let sinks = &df_desc.sink_exports;
1435                if sinks.len() != 1 {
1436                    return Err(AdapterError::Internal(
1437                        "expected exactly one copy to s3 sink".into(),
1438                    ));
1439                }
1440                let (_, sink_desc) = sinks
1441                    .first_key_value()
1442                    .expect("known to be exactly one copy to s3 sink");
1443                let s3_sink_connection = match &sink_desc.connection {
1444                    mz_compute_types::sinks::ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1445                        conn.clone()
1446                    }
1447                    _ => {
1448                        return Err(AdapterError::Internal(
1449                            "expected copy to s3 oneshot sink".into(),
1450                        ));
1451                    }
1452                };
1453
1454                // Perform S3 preflight check in background task (via coordinator).
1455                // This runs slow S3 operations without blocking the coordinator's main task.
1456                self.call_coordinator(|tx| Command::CopyToPreflight {
1457                    s3_sink_connection,
1458                    sink_id,
1459                    tx,
1460                })
1461                .await?;
1462
1463                // Preflight succeeded, now execute the actual COPY TO dataflow
1464                let watch_set = statement_logging_id.map(|logging_id| {
1465                    WatchSetCreation::new(
1466                        logging_id,
1467                        catalog.state(),
1468                        &input_id_bundle,
1469                        determination.timestamp_context.timestamp_or_default(),
1470                    )
1471                });
1472
1473                let response = self
1474                    .call_coordinator(|tx| Command::ExecuteCopyTo {
1475                        df_desc: Box::new(df_desc),
1476                        compute_instance: target_cluster_id,
1477                        target_replica,
1478                        source_ids,
1479                        conn_id: session.conn_id().clone(),
1480                        watch_set,
1481                        tx,
1482                    })
1483                    .await?;
1484
1485                Ok(Some(response))
1486            }
1487        }
1488    }
1489
1490    /// (Similar to Coordinator::determine_timestamp)
1491    /// Determines the timestamp for a query, acquires read holds that ensure the
1492    /// query remains executable at that time, and returns those.
1493    /// The caller is responsible for eventually dropping those read holds.
1494    ///
1495    /// Note: self is taken &mut because of the lazy fetching in `get_compute_instance_client`.
1496    pub(crate) async fn frontend_determine_timestamp(
1497        &mut self,
1498        session: &Session,
1499        id_bundle: &CollectionIdBundle,
1500        when: &QueryWhen,
1501        compute_instance: ComputeInstanceId,
1502        timeline_context: &TimelineContext,
1503        oracle_read_ts: Option<Timestamp>,
1504        real_time_recency_ts: Option<Timestamp>,
1505    ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> {
1506        // this is copy-pasted from Coordinator
1507
1508        let isolation_level = session.vars().transaction_isolation();
1509
1510        let (read_holds, upper) = self
1511            .acquire_read_holds_and_least_valid_write(id_bundle)
1512            .await
1513            .map_err(|err| {
1514                AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1515                    err,
1516                    compute_instance,
1517                )
1518            })?;
1519        let (det, read_holds) = <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1520            session,
1521            id_bundle,
1522            when,
1523            timeline_context,
1524            oracle_read_ts,
1525            real_time_recency_ts,
1526            isolation_level,
1527            read_holds,
1528            upper.clone(),
1529        )?;
1530
1531        session
1532            .metrics()
1533            .determine_timestamp(&[
1534                match det.respond_immediately() {
1535                    true => "true",
1536                    false => "false",
1537                },
1538                isolation_level.as_str(),
1539                &compute_instance.to_string(),
1540            ])
1541            .inc();
1542        if !det.respond_immediately()
1543            && isolation_level == &IsolationLevel::StrictSerializable
1544            && real_time_recency_ts.is_none()
1545        {
1546            // Note down the difference between StrictSerializable and Serializable into a metric.
1547            if let Some(strict) = det.timestamp_context.timestamp() {
1548                let (serializable_det, _tmp_read_holds) =
1549                    <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1550                        session,
1551                        id_bundle,
1552                        when,
1553                        timeline_context,
1554                        oracle_read_ts,
1555                        real_time_recency_ts,
1556                        &IsolationLevel::Serializable,
1557                        read_holds.clone(),
1558                        upper,
1559                    )?;
1560                if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
1561                    session
1562                        .metrics()
1563                        .timestamp_difference_for_strict_serializable_ms(&[compute_instance
1564                            .to_string()
1565                            .as_ref()])
1566                        .observe(f64::cast_lossy(u64::from(
1567                            strict.saturating_sub(*serializable),
1568                        )));
1569                }
1570            }
1571        }
1572
1573        Ok((det, read_holds))
1574    }
1575
1576    fn assert_read_holds_correct(
1577        read_holds: &ReadHolds,
1578        execution: &Execution,
1579        determination: &TimestampDetermination,
1580        target_cluster_id: ClusterId,
1581        in_immediate_multi_stmt_txn: bool,
1582    ) {
1583        // Extract source_imports, index_imports, as_of, and execution_name based on Execution variant
1584        let (source_imports, index_imports, as_of, execution_name): (
1585            Vec<GlobalId>,
1586            Vec<GlobalId>,
1587            Timestamp,
1588            &str,
1589        ) = match execution {
1590            Execution::Peek {
1591                global_lir_plan, ..
1592            } => match global_lir_plan.peek_plan() {
1593                PeekPlan::FastPath(fast_path_plan) => {
1594                    let (sources, indexes) = match fast_path_plan {
1595                        FastPathPlan::Constant(..) => (vec![], vec![]),
1596                        FastPathPlan::PeekExisting(_coll_id, idx_id, ..) => (vec![], vec![*idx_id]),
1597                        FastPathPlan::PeekPersist(global_id, ..) => (vec![*global_id], vec![]),
1598                    };
1599                    (
1600                        sources,
1601                        indexes,
1602                        determination.timestamp_context.timestamp_or_default(),
1603                        "FastPath",
1604                    )
1605                }
1606                PeekPlan::SlowPath(dataflow_plan) => {
1607                    let as_of = dataflow_plan
1608                        .desc
1609                        .as_of
1610                        .clone()
1611                        .expect("dataflow has an as_of")
1612                        .into_element();
1613                    (
1614                        dataflow_plan.desc.source_imports.keys().cloned().collect(),
1615                        dataflow_plan.desc.index_imports.keys().cloned().collect(),
1616                        as_of,
1617                        "SlowPath",
1618                    )
1619                }
1620            },
1621            Execution::CopyToS3 {
1622                global_lir_plan, ..
1623            } => {
1624                let df_desc = global_lir_plan.df_desc();
1625                let as_of = df_desc
1626                    .as_of
1627                    .clone()
1628                    .expect("dataflow has an as_of")
1629                    .into_element();
1630                (
1631                    df_desc.source_imports.keys().cloned().collect(),
1632                    df_desc.index_imports.keys().cloned().collect(),
1633                    as_of,
1634                    "CopyToS3",
1635                )
1636            }
1637            Execution::ExplainPlan { .. } | Execution::ExplainPushdown { .. } => {
1638                // No read holds assertions needed for EXPLAIN variants
1639                return;
1640            }
1641            Execution::Subscribe { df_desc, .. } => {
1642                let as_of = df_desc
1643                    .as_of
1644                    .clone()
1645                    .expect("dataflow has an as_of")
1646                    .into_element();
1647                (
1648                    df_desc.source_imports.keys().cloned().collect(),
1649                    df_desc.index_imports.keys().cloned().collect(),
1650                    as_of,
1651                    "Subscribe",
1652                )
1653            }
1654        };
1655
1656        // Assert that we have some read holds for all the imports of the dataflow.
1657        for id in source_imports.iter() {
1658            soft_assert_or_log!(
1659                read_holds.storage_holds.contains_key(id),
1660                "[{}] missing read hold for the source import {}; (in_immediate_multi_stmt_txn: {})",
1661                execution_name,
1662                id,
1663                in_immediate_multi_stmt_txn,
1664            );
1665        }
1666        for id in index_imports.iter() {
1667            soft_assert_or_log!(
1668                read_holds
1669                    .compute_ids()
1670                    .map(|(_instance, coll)| coll)
1671                    .contains(id),
1672                "[{}] missing read hold for the index import {}; (in_immediate_multi_stmt_txn: {})",
1673                execution_name,
1674                id,
1675                in_immediate_multi_stmt_txn,
1676            );
1677        }
1678
1679        // Also check the holds against the as_of.
1680        for (id, h) in read_holds.storage_holds.iter() {
1681            soft_assert_or_log!(
1682                h.since().less_equal(&as_of),
1683                "[{}] storage read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1684                execution_name,
1685                h.since(),
1686                id,
1687                as_of,
1688                determination,
1689                in_immediate_multi_stmt_txn,
1690            );
1691        }
1692        for ((instance, id), h) in read_holds.compute_holds.iter() {
1693            soft_assert_eq_or_log!(
1694                *instance,
1695                target_cluster_id,
1696                "[{}] the read hold on {} is on the wrong cluster; (in_immediate_multi_stmt_txn: {})",
1697                execution_name,
1698                id,
1699                in_immediate_multi_stmt_txn,
1700            );
1701            soft_assert_or_log!(
1702                h.since().less_equal(&as_of),
1703                "[{}] compute read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1704                execution_name,
1705                h.since(),
1706                id,
1707                as_of,
1708                determination,
1709                in_immediate_multi_stmt_txn,
1710            );
1711        }
1712    }
1713}
1714
1715/// Enum for branching among various execution steps after optimization
1716enum Execution {
1717    Peek {
1718        global_lir_plan: optimize::peek::GlobalLirPlan,
1719        optimization_finished_at: EpochMillis,
1720        plan_insights_optimizer_trace: Option<OptimizerTrace>,
1721        finishing: RowSetFinishing,
1722        copy_to: Option<plan::CopyFormat>,
1723        insights_ctx: Option<Box<PlanInsightsContext>>,
1724    },
1725    Subscribe {
1726        subscribe_plan: SubscribePlan,
1727        df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1728        df_meta: DataflowMetainfo,
1729        optimization_finished_at: EpochMillis,
1730    },
1731    CopyToS3 {
1732        global_lir_plan: optimize::copy_to::GlobalLirPlan,
1733        source_ids: BTreeSet<GlobalId>,
1734    },
1735    ExplainPlan {
1736        df_meta: DataflowMetainfo,
1737        explain_ctx: ExplainPlanContext,
1738        optimizer: optimize::peek::Optimizer,
1739        insights_ctx: Option<Box<PlanInsightsContext>>,
1740    },
1741    ExplainPushdown {
1742        imports: BTreeMap<GlobalId, mz_expr::MapFilterProject>,
1743        determination: TimestampDetermination,
1744    },
1745}