Skip to main content

mz_adapter/
frontend_peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::sync::Arc;
13use std::time::Duration;
14
15use itertools::Itertools;
16use mz_adapter_types::dyncfgs::ENABLE_FRONTEND_SUBSCRIBES;
17use mz_compute_types::ComputeInstanceId;
18use mz_compute_types::dataflows::DataflowDescription;
19use mz_controller_types::ClusterId;
20use mz_expr::{CollectionPlan, ResultSpec, RowSetFinishing};
21use mz_ore::cast::{CastFrom, CastLossy};
22use mz_ore::collections::CollectionExt;
23use mz_ore::now::EpochMillis;
24use mz_ore::task::JoinHandle;
25use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
26use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
27use mz_repr::role_id::RoleId;
28use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
29use mz_sql::ast::Raw;
30use mz_sql::catalog::CatalogCluster;
31use mz_sql::plan::Params;
32use mz_sql::plan::{
33    self, Explainee, ExplaineeStatement, Plan, QueryWhen, SelectPlan, SubscribePlan,
34};
35use mz_sql::rbac;
36use mz_sql::session::metadata::SessionMetadata;
37use mz_sql::session::vars::IsolationLevel;
38use mz_sql_parser::ast::{CopyDirection, ExplainStage, ShowStatement, Statement};
39use mz_transform::EmptyStatisticsOracle;
40use mz_transform::dataflow::DataflowMetainfo;
41use opentelemetry::trace::TraceContextExt;
42use timely::progress::Antichain;
43use tracing::{Span, debug, warn};
44use tracing_opentelemetry::OpenTelemetrySpanExt;
45
46use crate::catalog::Catalog;
47use crate::command::Command;
48use crate::coord::peek::{FastPathPlan, PeekPlan};
49use crate::coord::sequencer::{eval_copy_to_uri, statistics_oracle};
50use crate::coord::timeline::timedomain_for;
51use crate::coord::timestamp_selection::TimestampDetermination;
52use crate::coord::{
53    Coordinator, CopyToContext, ExecuteContextGuard, ExplainContext, ExplainPlanContext,
54    TargetCluster,
55};
56use crate::explain::insights::PlanInsightsContext;
57use crate::explain::optimizer_trace::OptimizerTrace;
58use crate::optimize::Optimize;
59use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
60use crate::session::{Session, TransactionOps, TransactionStatus};
61use crate::statement_logging::WatchSetCreation;
62use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
63use crate::{
64    AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, PeekClient, ReadHolds,
65    TimelineContext, TimestampContext, TimestampProvider, optimize,
66};
67use crate::{coord, metrics};
68
69impl PeekClient {
70    /// Attempt to sequence a peek from the session task.
71    ///
72    /// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the
73    /// Coordinator's sequencing. If it returns an error, it should be returned to the user.
74    ///
75    /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
76    /// triggering the execution of the underlying query.
77    pub(crate) async fn try_frontend_peek(
78        &mut self,
79        portal_name: &str,
80        catalog: Option<Arc<Catalog>>,
81        session: &mut Session,
82        outer_ctx_extra: &mut Option<ExecuteContextGuard>,
83    ) -> Result<Option<ExecuteResponse>, AdapterError> {
84        // # From handle_execute
85
86        if session.vars().emit_trace_id_notice() {
87            let span_context = tracing::Span::current()
88                .context()
89                .span()
90                .span_context()
91                .clone();
92            if span_context.is_valid() {
93                session.add_notice(AdapterNotice::QueryTrace {
94                    trace_id: span_context.trace_id(),
95                });
96            }
97        }
98
99        // TODO(peek-seq): This snapshot is wasted when we end up bailing out from the frontend peek
100        // sequencing. We could solve this is with that optimization where we
101        // continuously keep a catalog snapshot in the session, and only get a new one when the
102        // catalog revision has changed, which we could see with an atomic read.
103        // But anyhow, this problem will just go away when we reach the point that we never fall
104        // back to the old sequencing.
105        let catalog = match catalog {
106            Some(c) => c,
107            None => self.catalog_snapshot("try_frontend_peek").await,
108        };
109
110        // Extract things from the portal.
111        let (stmt, params, logging, lifecycle_timestamps) = {
112            if let Err(err) = Coordinator::verify_portal(&*catalog, session, portal_name) {
113                outer_ctx_extra
114                    .take()
115                    .and_then(|guard| guard.defuse().retire());
116                return Err(err);
117            }
118            let portal = session
119                .get_portal_unverified(portal_name)
120                // The portal is a session-level thing, so it couldn't have concurrently disappeared
121                // since the above verification.
122                .expect("called verify_portal above");
123            let params = portal.parameters.clone();
124            let stmt = portal.stmt.clone();
125            let logging = Arc::clone(&portal.logging);
126            let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
127            (stmt, params, logging, lifecycle_timestamps)
128        };
129
130        // Before planning, check if this is a statement type we can handle.
131        // This must happen BEFORE statement logging setup to avoid orphaned execution records.
132        if let Some(ref stmt) = stmt {
133            match &**stmt {
134                Statement::Select(_)
135                | Statement::ExplainAnalyzeObject(_)
136                | Statement::ExplainAnalyzeCluster(_)
137                | Statement::Show(ShowStatement::ShowObjects(_))
138                | Statement::Show(ShowStatement::ShowColumns(_)) => {
139                    // These are always fine, just continue.
140                    // Note: EXPLAIN ANALYZE will `plan` to `Plan::Select`.
141                    // Note: ShowObjects plans to `Plan::Select`, ShowColumns plans to `Plan::ShowColumns`.
142                    // We handle `Plan::ShowColumns` specially in `try_frontend_peek_inner`.
143                }
144                Statement::ExplainPlan(explain_stmt) => {
145                    // Only handle ExplainPlan for SELECT statements.
146                    // We don't want to handle e.g. EXPLAIN CREATE MATERIALIZED VIEW here, because that
147                    // requires purification before planning, which the frontend peek sequencing doesn't
148                    // do.
149                    match &explain_stmt.explainee {
150                        mz_sql_parser::ast::Explainee::Select(..) => {
151                            // This is a SELECT, continue
152                        }
153                        _ => {
154                            debug!(
155                                "Bailing out from try_frontend_peek, because EXPLAIN is not for a SELECT query"
156                            );
157                            return Ok(None);
158                        }
159                    }
160                }
161                Statement::ExplainPushdown(explain_stmt) => {
162                    // Only handle EXPLAIN FILTER PUSHDOWN for non-BROKEN SELECT statements
163                    match &explain_stmt.explainee {
164                        mz_sql_parser::ast::Explainee::Select(_, false) => {}
165                        _ => {
166                            debug!(
167                                "Bailing out from try_frontend_peek, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is for EXPLAIN BROKEN"
168                            );
169                            return Ok(None);
170                        }
171                    }
172                }
173                Statement::Copy(copy_stmt) => {
174                    match &copy_stmt.direction {
175                        CopyDirection::To => {
176                            // This is COPY TO (...), continue
177                        }
178                        CopyDirection::From => {
179                            debug!(
180                                "Bailing out from try_frontend_peek, because COPY FROM is not supported"
181                            );
182                            return Ok(None);
183                        }
184                    }
185                }
186
187                Statement::Subscribe(_)
188                    if ENABLE_FRONTEND_SUBSCRIBES.get(catalog.system_config().dyncfgs()) =>
189                {
190                    // We have a subscribe statement to process; continue.
191                }
192                _ => {
193                    debug!(
194                        "Bailing out from try_frontend_peek, because statement type is not supported"
195                    );
196                    return Ok(None);
197                }
198            }
199        }
200
201        // Set up statement logging, and log the beginning of execution.
202        // (But only if we're not executing in the context of another statement.)
203        let statement_logging_id = if outer_ctx_extra.is_none() {
204            // This is a new statement, so begin statement logging
205            let result = self.statement_logging_frontend.begin_statement_execution(
206                session,
207                &params,
208                &logging,
209                catalog.system_config(),
210                lifecycle_timestamps,
211            );
212
213            if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result {
214                self.log_began_execution(began_execution, mseh_update, prepared_statement);
215                Some(logging_id)
216            } else {
217                None
218            }
219        } else {
220            // We're executing in the context of another statement (e.g., FETCH),
221            // so extract the statement logging ID from the outer context if present.
222            // We take ownership and retire the outer context here. The end of execution will be
223            // logged in one of the following ways:
224            // - At the end of this function, if the execution is finished by then.
225            // - Later by the Coordinator, either due to RegisterFrontendPeek or ExecuteSlowPathPeek.
226            outer_ctx_extra
227                .take()
228                .and_then(|guard| guard.defuse().retire())
229        };
230
231        let result = self
232            .try_frontend_peek_inner(session, catalog, stmt, params, statement_logging_id)
233            .await;
234
235        // Log the end of execution if we are logging this statement and execution has already
236        // ended.
237        if let Some(logging_id) = statement_logging_id {
238            let reason = match &result {
239                // Streaming results are handled asynchronously by the coordinator
240                Ok(Some(
241                    ExecuteResponse::SendingRowsStreaming { .. }
242                    | ExecuteResponse::Subscribing { .. },
243                )) => {
244                    // Don't log here - the peek or subscribe is still executing.
245                    // It will be logged when handle_peek_notification is called.
246                    return result;
247                }
248                // COPY TO needs to check its inner response
249                Ok(Some(resp @ ExecuteResponse::CopyTo { resp: inner, .. })) => {
250                    match inner.as_ref() {
251                        ExecuteResponse::SendingRowsStreaming { .. }
252                        | ExecuteResponse::Subscribing { .. } => {
253                            // Don't log here - the peek or subscribe is still executing.
254                            // It will be logged when handle_peek_notification is called.
255                            return result;
256                        }
257                        // For non-streaming COPY TO responses, use the outer CopyTo for conversion
258                        _ => resp.into(),
259                    }
260                }
261                // Bailout case, which should not happen
262                Ok(None) => {
263                    soft_panic_or_log!(
264                        "Bailed out from `try_frontend_peek_inner` after we already logged the beginning of statement execution."
265                    );
266                    // This statement will be handled by the old peek sequencing, which will do its
267                    // own statement logging from the beginning. So, let's close out this one.
268                    self.log_ended_execution(
269                        logging_id,
270                        StatementEndedExecutionReason::Errored {
271                            error: "Internal error: bailed out from `try_frontend_peek_inner`"
272                                .to_string(),
273                        },
274                    );
275                    return result;
276                }
277                // All other success responses - use the From implementation
278                // TODO(peek-seq): After we delete the old peek sequencing, we'll be able to adjust
279                // the From implementation to do exactly what we need in the frontend peek
280                // sequencing, so that the above special cases won't be needed.
281                Ok(Some(resp)) => resp.into(),
282                Err(e) => StatementEndedExecutionReason::Errored {
283                    error: e.to_string(),
284                },
285            };
286
287            self.log_ended_execution(logging_id, reason);
288        }
289
290        result
291    }
292
293    /// This is encapsulated in an inner function so that the outer function can still do statement
294    /// logging after the `?` returns of the inner function.
295    async fn try_frontend_peek_inner(
296        &mut self,
297        session: &mut Session,
298        catalog: Arc<Catalog>,
299        stmt: Option<Arc<Statement<Raw>>>,
300        params: Params,
301        statement_logging_id: Option<crate::statement_logging::StatementLoggingId>,
302    ) -> Result<Option<ExecuteResponse>, AdapterError> {
303        let stmt = match stmt {
304            Some(stmt) => stmt,
305            None => {
306                debug!("try_frontend_peek_inner succeeded on an empty query");
307                return Ok(Some(ExecuteResponse::EmptyQuery));
308            }
309        };
310
311        session
312            .metrics()
313            .query_total(&[
314                metrics::session_type_label_value(session.user()),
315                metrics::statement_type_label_value(&stmt),
316            ])
317            .inc();
318
319        // # From handle_execute_inner
320
321        let conn_catalog = catalog.for_session(session);
322        // (`resolved_ids` should be derivable from `stmt`. If `stmt` is later transformed to
323        // remove/add IDs, then `resolved_ids` should be updated to also remove/add those IDs.)
324        let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?;
325
326        let pcx = session.pcx();
327        let plan = mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, &params, &resolved_ids)?;
328
329        /// What do we do with the result of the select?
330        enum QueryPlan<'a> {
331            Select(&'a SelectPlan),
332            CopyTo(&'a SelectPlan, CopyToContext),
333            Subscribe(&'a SubscribePlan),
334        }
335
336        let (query_plan, explain_ctx) = match &plan {
337            Plan::Select(select_plan) => {
338                let explain_ctx = if session.vars().emit_plan_insights_notice() {
339                    let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
340                    ExplainContext::PlanInsightsNotice(optimizer_trace)
341                } else {
342                    ExplainContext::None
343                };
344                (QueryPlan::Select(select_plan), explain_ctx)
345            }
346            Plan::ShowColumns(show_columns_plan) => {
347                // ShowColumns wraps a SelectPlan, extract it and proceed as normal.
348                (
349                    QueryPlan::Select(&show_columns_plan.select_plan),
350                    ExplainContext::None,
351                )
352            }
353            Plan::ExplainPlan(plan::ExplainPlanPlan {
354                stage,
355                format,
356                config,
357                explainee: Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc }),
358            }) => {
359                // Create OptimizerTrace to collect optimizer plans
360                let optimizer_trace = OptimizerTrace::new(stage.paths());
361                let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
362                    broken: *broken,
363                    config: config.clone(),
364                    format: *format,
365                    stage: *stage,
366                    replan: None,
367                    desc: Some(desc.clone()),
368                    optimizer_trace,
369                });
370                (QueryPlan::Select(plan), explain_ctx)
371            }
372            // COPY TO S3
373            Plan::CopyTo(plan::CopyToPlan {
374                select_plan,
375                desc,
376                to,
377                connection,
378                connection_id,
379                format,
380                max_file_size,
381            }) => {
382                let uri = eval_copy_to_uri(to.clone(), session, catalog.state())?;
383
384                // (output_batch_count will be set later)
385                let copy_to_ctx = CopyToContext {
386                    desc: desc.clone(),
387                    uri,
388                    connection: connection.clone(),
389                    connection_id: *connection_id,
390                    format: format.clone(),
391                    max_file_size: *max_file_size,
392                    output_batch_count: None,
393                };
394
395                (
396                    QueryPlan::CopyTo(select_plan, copy_to_ctx),
397                    ExplainContext::None,
398                )
399            }
400            Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => {
401                // Only handle EXPLAIN FILTER PUSHDOWN for SELECT statements
402                match explainee {
403                    plan::Explainee::Statement(plan::ExplaineeStatement::Select {
404                        broken: false,
405                        plan,
406                        desc: _,
407                    }) => {
408                        let explain_ctx = ExplainContext::Pushdown;
409                        (QueryPlan::Select(plan), explain_ctx)
410                    }
411                    _ => {
412                        // This shouldn't happen because we already checked for this at the AST
413                        // level before calling `try_frontend_peek_inner`.
414                        soft_panic_or_log!(
415                            "unexpected EXPLAIN FILTER PUSHDOWN plan kind in frontend peek sequencing: {:?}",
416                            explainee
417                        );
418                        debug!(
419                            "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is EXPLAIN BROKEN"
420                        );
421                        return Ok(None);
422                    }
423                }
424            }
425            Plan::SideEffectingFunc(sef_plan) => {
426                // Side-effecting functions need Coordinator state (e.g., active_conns),
427                // so delegate to the Coordinator via a Command.
428                // The RBAC check is performed in the Coordinator where active_conns is available.
429                let response = self
430                    .call_coordinator(|tx| Command::ExecuteSideEffectingFunc {
431                        plan: sef_plan.clone(),
432                        conn_id: session.conn_id().clone(),
433                        current_role: session.role_metadata().current_role,
434                        tx,
435                    })
436                    .await?;
437                return Ok(Some(response));
438            }
439            Plan::Subscribe(subscribe) => (QueryPlan::Subscribe(subscribe), ExplainContext::None),
440            _ => {
441                // This shouldn't happen because we already checked for this at the AST
442                // level before calling `try_frontend_peek_inner`.
443                soft_panic_or_log!(
444                    "Unexpected plan kind in frontend peek sequencing: {:?}",
445                    plan
446                );
447                debug!(
448                    "Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, side-effecting SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO S3"
449                );
450                return Ok(None);
451            }
452        };
453
454        let when = match query_plan {
455            QueryPlan::Select(s) => &s.when,
456            QueryPlan::CopyTo(s, _) => &s.when,
457            QueryPlan::Subscribe(s) => &s.when,
458        };
459
460        let depends_on = match query_plan {
461            QueryPlan::Select(s) => s.source.depends_on(),
462            QueryPlan::CopyTo(s, _) => s.source.depends_on(),
463            QueryPlan::Subscribe(s) => s.from.depends_on(),
464        };
465
466        let contains_temporal = match query_plan {
467            QueryPlan::Select(s) => s.source.contains_temporal(),
468            QueryPlan::CopyTo(s, _) => s.source.contains_temporal(),
469            QueryPlan::Subscribe(s) => Ok(s.from.contains_temporal()),
470        };
471
472        // # From sequence_plan
473
474        // We have checked the plan kind above.
475        assert!(plan.allowed_in_read_only());
476
477        let (cluster, target_cluster_id, target_cluster_name) = {
478            let target_cluster = match session.transaction().cluster() {
479                // Use the current transaction's cluster.
480                Some(cluster_id) => TargetCluster::Transaction(cluster_id),
481                // If there isn't a current cluster set for a transaction, then try to auto route.
482                None => coord::catalog_serving::auto_run_on_catalog_server(
483                    &conn_catalog,
484                    session,
485                    &plan,
486                ),
487            };
488            let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
489            (cluster, cluster.id, &cluster.name)
490        };
491
492        // Log cluster selection
493        if let Some(logging_id) = &statement_logging_id {
494            self.log_set_cluster(*logging_id, target_cluster_id);
495        }
496
497        coord::catalog_serving::check_cluster_restrictions(
498            target_cluster_name.as_str(),
499            &conn_catalog,
500            &plan,
501        )?;
502
503        rbac::check_plan(
504            &conn_catalog,
505            // We can't look at `active_conns` here, but that's ok, because this case was handled
506            // above already inside `Command::ExecuteSideEffectingFunc`.
507            None::<fn(u32) -> Option<RoleId>>,
508            session,
509            &plan,
510            Some(target_cluster_id),
511            &resolved_ids,
512        )?;
513
514        if let Some((_, wait_future)) =
515            coord::appends::waiting_on_startup_appends(&*catalog, session, &plan)
516        {
517            wait_future.await;
518        }
519
520        let max_query_result_size = Some(session.vars().max_query_result_size());
521
522        // # From sequence_peek
523
524        // # From peek_validate
525
526        let compute_instance_snapshot =
527            ComputeInstanceSnapshot::new_without_collections(cluster.id());
528
529        let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config())
530            .override_from(&catalog.get_cluster(cluster.id()).config.features())
531            .override_from(&explain_ctx);
532
533        if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
534            return Err(AdapterError::NoClusterReplicasAvailable {
535                name: cluster.name.clone(),
536                is_managed: cluster.is_managed(),
537            });
538        }
539
540        let (_, view_id) = self.transient_id_gen.allocate_id();
541        let (_, index_id) = self.transient_id_gen.allocate_id();
542
543        let target_replica_name = session.vars().cluster_replica();
544        let mut target_replica = target_replica_name
545            .map(|name| {
546                cluster
547                    .replica_id(name)
548                    .ok_or(AdapterError::UnknownClusterReplica {
549                        cluster_name: cluster.name.clone(),
550                        replica_name: name.to_string(),
551                    })
552            })
553            .transpose()?;
554
555        let source_ids = depends_on;
556        // TODO(peek-seq): validate_timeline_context can be expensive in real scenarios (not in
557        // simple benchmarks), because it traverses transitive dependencies even of indexed views and
558        // materialized views (also traversing their MIR plans).
559        let mut timeline_context = catalog.validate_timeline_context(source_ids.iter().copied())?;
560        if matches!(timeline_context, TimelineContext::TimestampIndependent) && contains_temporal? {
561            // If the source IDs are timestamp independent but the query contains temporal functions,
562            // then the timeline context needs to be upgraded to timestamp dependent. This is
563            // required because `source_ids` doesn't contain functions.
564            timeline_context = TimelineContext::TimestampDependent;
565        }
566
567        let notices = coord::sequencer::check_log_reads(
568            &catalog,
569            cluster,
570            &source_ids,
571            &mut target_replica,
572            session.vars(),
573        )?;
574        session.add_notices(notices);
575
576        // # From peek_linearize_timestamp
577
578        let isolation_level = session.vars().transaction_isolation().clone();
579        let timeline = Coordinator::get_timeline(&timeline_context);
580        let needs_linearized_read_ts =
581            Coordinator::needs_linearized_read_ts(&isolation_level, when);
582
583        let oracle_read_ts = match timeline {
584            Some(timeline) if needs_linearized_read_ts => {
585                let oracle = self.ensure_oracle(timeline).await?;
586                let oracle_read_ts = oracle.read_ts().await;
587                Some(oracle_read_ts)
588            }
589            Some(_) | None => None,
590        };
591
592        // # From peek_real_time_recency
593
594        let vars = session.vars();
595        let real_time_recency_ts: Option<Timestamp> = if vars.real_time_recency()
596            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
597            && !session.contains_read_timestamp()
598        {
599            // Only call the coordinator when we actually need real-time recency
600            self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp {
601                source_ids: source_ids.clone(),
602                real_time_recency_timeout: *vars.real_time_recency_timeout(),
603                tx,
604            })
605            .await?
606        } else {
607            None
608        };
609
610        // # From peek_timestamp_read_hold
611
612        let dataflow_builder =
613            DataflowBuilder::new(catalog.state(), compute_instance_snapshot.clone());
614        let input_id_bundle = dataflow_builder.sufficient_collections(source_ids.clone());
615
616        // ## From sequence_peek_timestamp
617
618        // Warning: This will be false for AS OF queries, even if we are otherwise inside a
619        // multi-statement transaction. (It's also false for FreshestTableWrite, which is currently
620        // only read-then-write queries, which can't be part of multi-statement transactions, so
621        // FreshestTableWrite doesn't matter.)
622        //
623        // TODO(peek-seq): It's not totally clear to me what the intended semantics are for AS OF
624        // queries inside a transaction: We clearly can't use the transaction timestamp, but the old
625        // peek sequencing still does a timedomain validation. The new peek sequencing does not do
626        // timedomain validation for AS OF queries, which seems more natural. But I'm thinking that
627        // it would be the cleanest to just simply disallow AS OF queries inside transactions.
628        let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when)
629            && !matches!(query_plan, QueryPlan::Subscribe { .. });
630
631        // Fetch or generate a timestamp for this query and fetch or acquire read holds.
632        let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
633            // Use the transaction's timestamp if it exists and this isn't an AS OF query.
634            // (`in_immediate_multi_stmt_txn` is false for AS OF queries.)
635            Some(
636                determination @ TimestampDetermination {
637                    timestamp_context: TimestampContext::TimelineTimestamp { .. },
638                    ..
639                },
640            ) if in_immediate_multi_stmt_txn => {
641                // This is a subsequent (non-AS OF, non-constant) query in a multi-statement
642                // transaction. We now:
643                // - Validate that the query only accesses collections within the transaction's
644                //   timedomain (which we know from the stored read holds).
645                // - Use the transaction's stored timestamp determination.
646                // - Use the (relevant subset of the) transaction's read holds.
647
648                let txn_read_holds_opt = self
649                    .call_coordinator(|tx| Command::GetTransactionReadHoldsBundle {
650                        conn_id: session.conn_id().clone(),
651                        tx,
652                    })
653                    .await;
654
655                if let Some(txn_read_holds) = txn_read_holds_opt {
656                    let allowed_id_bundle = txn_read_holds.id_bundle();
657                    let outside = input_id_bundle.difference(&allowed_id_bundle);
658
659                    // Queries without a timestamp and timeline can belong to any existing timedomain.
660                    if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
661                        let valid_names =
662                            allowed_id_bundle.resolve_names(&*catalog, session.conn_id());
663                        let invalid_names = outside.resolve_names(&*catalog, session.conn_id());
664                        return Err(AdapterError::RelationOutsideTimeDomain {
665                            relations: invalid_names,
666                            names: valid_names,
667                        });
668                    }
669
670                    // Extract the subset of read holds for the collections this query accesses.
671                    let read_holds = txn_read_holds.subset(&input_id_bundle);
672
673                    (determination, read_holds)
674                } else {
675                    // This should never happen: we're in a subsequent query of a multi-statement
676                    // transaction (we have a transaction timestamp), but the coordinator has no
677                    // transaction read holds stored. This indicates a bug in the transaction
678                    // handling.
679                    return Err(AdapterError::Internal(
680                        "Missing transaction read holds for multi-statement transaction"
681                            .to_string(),
682                    ));
683                }
684            }
685            _ => {
686                // There is no timestamp determination yet for this transaction. Either:
687                // - We are not in a multi-statement transaction.
688                // - This is the first (non-AS OF) query in a multi-statement transaction.
689                // - This is an AS OF query.
690                // - This is a constant query (`TimestampContext::NoTimestamp`).
691
692                let timedomain_bundle;
693                let determine_bundle = if in_immediate_multi_stmt_txn {
694                    // This is the first (non-AS OF) query in a multi-statement transaction.
695                    // Determine a timestamp that will be valid for anything in any schema
696                    // referenced by the first query.
697                    timedomain_bundle = timedomain_for(
698                        &*catalog,
699                        &dataflow_builder,
700                        &source_ids,
701                        &timeline_context,
702                        session.conn_id(),
703                        target_cluster_id,
704                    )?;
705                    &timedomain_bundle
706                } else {
707                    // Simply use the inputs of the current query.
708                    &input_id_bundle
709                };
710                let (determination, read_holds) = self
711                    .frontend_determine_timestamp(
712                        session,
713                        determine_bundle,
714                        when,
715                        target_cluster_id,
716                        &timeline_context,
717                        oracle_read_ts,
718                        real_time_recency_ts,
719                    )
720                    .await?;
721
722                // If this is the first (non-AS OF) query in a multi-statement transaction, store
723                // the read holds in the coordinator, so subsequent queries can validate against
724                // them.
725                if in_immediate_multi_stmt_txn {
726                    self.call_coordinator(|tx| Command::StoreTransactionReadHolds {
727                        conn_id: session.conn_id().clone(),
728                        read_holds: read_holds.clone(),
729                        tx,
730                    })
731                    .await;
732                }
733
734                (determination, read_holds)
735            }
736        };
737
738        {
739            // Assert that we have a read hold for all the collections in our `input_id_bundle`.
740            for id in input_id_bundle.iter() {
741                let s = read_holds.storage_holds.contains_key(&id);
742                let c = read_holds
743                    .compute_ids()
744                    .map(|(_instance, coll)| coll)
745                    .contains(&id);
746                soft_assert_or_log!(
747                    s || c,
748                    "missing read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
749                    id,
750                    in_immediate_multi_stmt_txn,
751                );
752            }
753
754            // Assert that each part of the `input_id_bundle` corresponds to the right part of
755            // `read_holds`.
756            for id in input_id_bundle.storage_ids.iter() {
757                soft_assert_or_log!(
758                    read_holds.storage_holds.contains_key(id),
759                    "missing storage read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
760                    id,
761                    in_immediate_multi_stmt_txn,
762                );
763            }
764            for id in input_id_bundle
765                .compute_ids
766                .iter()
767                .flat_map(|(_instance, colls)| colls)
768            {
769                soft_assert_or_log!(
770                    read_holds
771                        .compute_ids()
772                        .map(|(_instance, coll)| coll)
773                        .contains(id),
774                    "missing compute read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
775                    id,
776                    in_immediate_multi_stmt_txn,
777                );
778            }
779        }
780
781        // (TODO(peek-seq): The below TODO is copied from the old peek sequencing. We should resolve
782        // this when we decide what to with `AS OF` in transactions.)
783        // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
784        // arbitrary and we don't recall why we did it (possibly an error!). Change this to always
785        // set the transaction ops. Decide and document what our policy should be on AS OF queries.
786        // Maybe they shouldn't be allowed in transactions at all because it's hard to explain
787        // what's going on there. This should probably get a small design document.
788
789        // We only track the peeks in the session if the query doesn't use AS
790        // OF or we're inside an explicit transaction. The latter case is
791        // necessary to support PG's `BEGIN` semantics, whose behavior can
792        // depend on whether or not reads have occurred in the txn.
793        let requires_linearization = (&explain_ctx).into();
794        let mut transaction_determination = determination.clone();
795        match query_plan {
796            QueryPlan::Subscribe { .. } => {
797                if when.is_transactional() {
798                    session.add_transaction_ops(TransactionOps::Subscribe)?;
799                }
800            }
801            QueryPlan::Select(..) | QueryPlan::CopyTo(..) => {
802                if when.is_transactional() {
803                    session.add_transaction_ops(TransactionOps::Peeks {
804                        determination: transaction_determination,
805                        cluster_id: target_cluster_id,
806                        requires_linearization,
807                    })?;
808                } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
809                    // If the query uses AS OF, then ignore the timestamp.
810                    transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
811                    session.add_transaction_ops(TransactionOps::Peeks {
812                        determination: transaction_determination,
813                        cluster_id: target_cluster_id,
814                        requires_linearization,
815                    })?;
816                }
817            }
818        }
819
820        // # From peek_optimize
821
822        let stats = statistics_oracle(
823            session,
824            &source_ids,
825            &determination.timestamp_context.antichain(),
826            true,
827            catalog.system_config(),
828            &*self.storage_collections,
829        )
830        .await
831        .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
832
833        // Generate data structures that can be moved to another task where we will perform possibly
834        // expensive optimizations.
835        let timestamp_context = determination.timestamp_context.clone();
836        let session_meta = session.meta();
837        let now = catalog.config().now.clone();
838        let target_cluster_name = target_cluster_name.clone();
839        let needs_plan_insights = explain_ctx.needs_plan_insights();
840        let determination_for_pushdown = if matches!(explain_ctx, ExplainContext::Pushdown) {
841            // This is a hairy data structure, so avoid this clone if we are not in
842            // EXPLAIN FILTER PUSHDOWN.
843            Some(determination.clone())
844        } else {
845            None
846        };
847
848        let span = Span::current();
849
850        // Prepare data for plan insights if needed
851        let catalog_for_insights = if needs_plan_insights {
852            Some(Arc::clone(&catalog))
853        } else {
854            None
855        };
856        let mut compute_instances = BTreeMap::new();
857        if needs_plan_insights {
858            for user_cluster in catalog.user_clusters() {
859                let snapshot = ComputeInstanceSnapshot::new_without_collections(user_cluster.id);
860                compute_instances.insert(user_cluster.name.clone(), snapshot);
861            }
862        }
863
864        let source_ids_for_closure = source_ids.clone();
865
866        let optimization_future: JoinHandle<Result<_, AdapterError>> = match query_plan {
867            QueryPlan::CopyTo(select_plan, mut copy_to_ctx) => {
868                let raw_expr = select_plan.source.clone();
869
870                // COPY TO path: calculate output_batch_count and create copy_to optimizer
871                let worker_counts = cluster.replicas().map(|r| {
872                    let loc = &r.config.location;
873                    loc.workers().unwrap_or_else(|| loc.num_processes())
874                });
875                let max_worker_count = match worker_counts.max() {
876                    Some(count) => u64::cast_from(count),
877                    None => {
878                        return Err(AdapterError::NoClusterReplicasAvailable {
879                            name: cluster.name.clone(),
880                            is_managed: cluster.is_managed(),
881                        });
882                    }
883                };
884                copy_to_ctx.output_batch_count = Some(max_worker_count);
885
886                let mut optimizer = optimize::copy_to::Optimizer::new(
887                    Arc::clone(&catalog),
888                    compute_instance_snapshot,
889                    view_id,
890                    copy_to_ctx,
891                    optimizer_config,
892                    self.optimizer_metrics.clone(),
893                );
894
895                mz_ore::task::spawn_blocking(
896                    || "optimize copy-to",
897                    move || {
898                        span.in_scope(|| {
899                            let _dispatch_guard = explain_ctx.dispatch_guard();
900
901                            // COPY TO path
902                            // HIR ⇒ MIR lowering and MIR optimization (local)
903                            let local_mir_plan =
904                                optimizer.catch_unwind_optimize(raw_expr.clone())?;
905                            // Attach resolved context required to continue the pipeline.
906                            let local_mir_plan = local_mir_plan.resolve(
907                                timestamp_context.clone(),
908                                &session_meta,
909                                stats,
910                            );
911                            // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
912                            let global_lir_plan =
913                                optimizer.catch_unwind_optimize(local_mir_plan)?;
914                            Ok(Execution::CopyToS3 {
915                                global_lir_plan,
916                                source_ids: source_ids_for_closure,
917                            })
918                        })
919                    },
920                )
921            }
922            QueryPlan::Select(select_plan) => {
923                let select_plan = select_plan.clone();
924                let raw_expr = select_plan.source.clone();
925
926                // SELECT/EXPLAIN path: create peek optimizer
927                let mut optimizer = optimize::peek::Optimizer::new(
928                    Arc::clone(&catalog),
929                    compute_instance_snapshot,
930                    select_plan.finishing.clone(),
931                    view_id,
932                    index_id,
933                    optimizer_config,
934                    self.optimizer_metrics.clone(),
935                );
936
937                mz_ore::task::spawn_blocking(
938                    || "optimize peek",
939                    move || {
940                        span.in_scope(|| {
941                            let _dispatch_guard = explain_ctx.dispatch_guard();
942
943                            // SELECT/EXPLAIN path
944                            // HIR ⇒ MIR lowering and MIR optimization (local)
945
946                            // The purpose of wrapping the following in a closure is to control where the
947                            // `?`s return from, so that even when a `catch_unwind_optimize` call fails,
948                            // we can still handle `EXPLAIN BROKEN`.
949                            let pipeline = || {
950                                let local_mir_plan =
951                                    optimizer.catch_unwind_optimize(raw_expr.clone())?;
952                                // Attach resolved context required to continue the pipeline.
953                                let local_mir_plan = local_mir_plan.resolve(
954                                    timestamp_context.clone(),
955                                    &session_meta,
956                                    stats,
957                                );
958                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
959                                let global_lir_plan =
960                                    optimizer.catch_unwind_optimize(local_mir_plan)?;
961                                Ok::<_, AdapterError>(global_lir_plan)
962                            };
963
964                            let global_lir_plan_result = pipeline();
965                            let optimization_finished_at = now();
966
967                            let create_insights_ctx =
968                                |optimizer: &optimize::peek::Optimizer,
969                                 is_notice: bool|
970                                 -> Option<Box<PlanInsightsContext>> {
971                                    if !needs_plan_insights {
972                                        return None;
973                                    }
974
975                                    let catalog = catalog_for_insights.as_ref()?;
976
977                                    let enable_re_optimize = if needs_plan_insights {
978                                        // Disable any plan insights that use the optimizer if we only want the
979                                        // notice and plan optimization took longer than the threshold. This is
980                                        // to prevent a situation where optimizing takes a while and there are
981                                        // lots of clusters, which would delay peek execution by the product of
982                                        // those.
983                                        //
984                                        // (This heuristic doesn't work well, see #9492.)
985                                        let dyncfgs = catalog.system_config().dyncfgs();
986                                        let opt_limit = mz_adapter_types::dyncfgs
987                                        ::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
988                                            .get(dyncfgs);
989                                        !(is_notice && optimizer.duration() > opt_limit)
990                                    } else {
991                                        false
992                                    };
993
994                                    Some(Box::new(PlanInsightsContext {
995                                        stmt: select_plan
996                                            .select
997                                            .as_deref()
998                                            .map(Clone::clone)
999                                            .map(Statement::Select),
1000                                        raw_expr: raw_expr.clone(),
1001                                        catalog: Arc::clone(catalog),
1002                                        compute_instances,
1003                                        target_instance: target_cluster_name,
1004                                        metrics: optimizer.metrics().clone(),
1005                                        finishing: optimizer.finishing().clone(),
1006                                        optimizer_config: optimizer.config().clone(),
1007                                        session: session_meta,
1008                                        timestamp_context,
1009                                        view_id: optimizer.select_id(),
1010                                        index_id: optimizer.index_id(),
1011                                        enable_re_optimize,
1012                                    }))
1013                                };
1014
1015                            let global_lir_plan = match global_lir_plan_result {
1016                                Ok(plan) => plan,
1017                                Err(err) => {
1018                                    let result = if let ExplainContext::Plan(explain_ctx) =
1019                                        explain_ctx
1020                                        && explain_ctx.broken
1021                                    {
1022                                        // EXPLAIN BROKEN: log error and continue with defaults
1023                                        tracing::error!(
1024                                            "error while handling EXPLAIN statement: {}",
1025                                            err
1026                                        );
1027                                        Ok(Execution::ExplainPlan {
1028                                            df_meta: Default::default(),
1029                                            explain_ctx,
1030                                            optimizer,
1031                                            insights_ctx: None,
1032                                        })
1033                                    } else {
1034                                        Err(err)
1035                                    };
1036                                    return result;
1037                                }
1038                            };
1039
1040                            match explain_ctx {
1041                                ExplainContext::Plan(explain_ctx) => {
1042                                    let (_, df_meta, _) = global_lir_plan.unapply();
1043                                    let insights_ctx = create_insights_ctx(&optimizer, false);
1044                                    Ok(Execution::ExplainPlan {
1045                                        df_meta,
1046                                        explain_ctx,
1047                                        optimizer,
1048                                        insights_ctx,
1049                                    })
1050                                }
1051                                ExplainContext::None => Ok(Execution::Peek {
1052                                    global_lir_plan,
1053                                    optimization_finished_at,
1054                                    plan_insights_optimizer_trace: None,
1055                                    finishing: select_plan.finishing,
1056                                    copy_to: select_plan.copy_to,
1057                                    insights_ctx: None,
1058                                }),
1059                                ExplainContext::PlanInsightsNotice(optimizer_trace) => {
1060                                    let insights_ctx = create_insights_ctx(&optimizer, true);
1061                                    Ok(Execution::Peek {
1062                                        global_lir_plan,
1063                                        optimization_finished_at,
1064                                        plan_insights_optimizer_trace: Some(optimizer_trace),
1065                                        finishing: select_plan.finishing,
1066                                        copy_to: select_plan.copy_to,
1067                                        insights_ctx,
1068                                    })
1069                                }
1070                                ExplainContext::Pushdown => {
1071                                    let (plan, _, _) = global_lir_plan.unapply();
1072                                    let imports = match plan {
1073                                        PeekPlan::SlowPath(plan) => plan
1074                                            .desc
1075                                            .source_imports
1076                                            .into_iter()
1077                                            .filter_map(|(id, import)| {
1078                                                import.desc.arguments.operators.map(|mfp| (id, mfp))
1079                                            })
1080                                            .collect(),
1081                                        PeekPlan::FastPath(_) => {
1082                                            std::collections::BTreeMap::default()
1083                                        }
1084                                    };
1085                                    Ok(Execution::ExplainPushdown {
1086                                        imports,
1087                                        determination: determination_for_pushdown
1088                                            .expect("it's present for the ExplainPushdown case"),
1089                                    })
1090                                }
1091                            }
1092                        })
1093                    },
1094                )
1095            }
1096            QueryPlan::Subscribe(plan) => {
1097                let plan = plan.clone();
1098                let catalog: Arc<Catalog> = Arc::clone(&catalog);
1099                let debug_name = format!("subscribe-{}", index_id);
1100                let mut optimizer = optimize::subscribe::Optimizer::new(
1101                    catalog,
1102                    compute_instance_snapshot.clone(),
1103                    view_id,
1104                    index_id,
1105                    plan.with_snapshot,
1106                    plan.up_to,
1107                    debug_name,
1108                    optimizer_config,
1109                    self.optimizer_metrics.clone(),
1110                );
1111                mz_ore::task::spawn_blocking(
1112                    || "optimize subscribe",
1113                    move || {
1114                        span.in_scope(|| {
1115                            let _dispatch_guard = explain_ctx.dispatch_guard();
1116
1117                            let global_mir_plan = optimizer.catch_unwind_optimize(plan.clone())?;
1118                            let as_of = timestamp_context.timestamp_or_default();
1119
1120                            if let Some(up_to) = optimizer.up_to() {
1121                                if as_of > up_to {
1122                                    return Err(AdapterError::AbsurdSubscribeBounds {
1123                                        as_of,
1124                                        up_to,
1125                                    });
1126                                }
1127                            }
1128                            let local_mir_plan =
1129                                global_mir_plan.resolve(Antichain::from_elem(as_of));
1130
1131                            let global_lir_plan =
1132                                optimizer.catch_unwind_optimize(local_mir_plan)?;
1133                            let optimization_finished_at = now();
1134
1135                            let (df_desc, df_meta) = global_lir_plan.unapply();
1136                            Ok(Execution::Subscribe {
1137                                subscribe_plan: plan,
1138                                df_desc,
1139                                df_meta,
1140                                optimization_finished_at,
1141                            })
1142                        })
1143                    },
1144                )
1145            }
1146        };
1147
1148        let mut optimization_timeout = *session.vars().statement_timeout();
1149        // Timeout of 0 is equivalent to "off", meaning we will wait "forever."
1150        if optimization_timeout == Duration::ZERO {
1151            optimization_timeout = Duration::MAX;
1152        }
1153        let optimization_result =
1154            // Note: spawn_blocking tasks cannot be cancelled, so on timeout we stop waiting but the
1155            // optimization task continues running in the background until completion. See
1156            // https://github.com/MaterializeInc/database-issues/issues/8644 for properly cancelling
1157            // optimizer runs.
1158            match tokio::time::timeout(optimization_timeout, optimization_future).await {
1159                Ok(Ok(result)) => result,
1160                Ok(Err(AdapterError::Optimizer(err))) => {
1161                    return Err(AdapterError::Internal(format!(
1162                        "internal error in optimizer: {}",
1163                        err
1164                    )));
1165                }
1166                Ok(Err(err)) => {
1167                    return Err(err);
1168                }
1169                Err(_elapsed) => {
1170                    warn!("optimize peek timed out after {:?}", optimization_timeout);
1171                    return Err(AdapterError::StatementTimeout);
1172                }
1173            };
1174
1175        // Log optimization finished
1176        if let Some(logging_id) = &statement_logging_id {
1177            self.log_lifecycle_event(*logging_id, StatementLifecycleEvent::OptimizationFinished);
1178        }
1179
1180        // Assert that read holds are correct for the execution plan
1181        Self::assert_read_holds_correct(
1182            &read_holds,
1183            &optimization_result,
1184            &determination,
1185            target_cluster_id,
1186            in_immediate_multi_stmt_txn,
1187        );
1188
1189        // Handle the optimization result: either generate EXPLAIN output or continue with execution
1190        match optimization_result {
1191            Execution::ExplainPlan {
1192                df_meta,
1193                explain_ctx,
1194                optimizer,
1195                insights_ctx,
1196            } => {
1197                let rows = coord::sequencer::explain_plan_inner(
1198                    session,
1199                    &catalog,
1200                    df_meta,
1201                    explain_ctx,
1202                    optimizer,
1203                    insights_ctx,
1204                )
1205                .await?;
1206
1207                Ok(Some(ExecuteResponse::SendingRowsImmediate {
1208                    rows: Box::new(rows.into_row_iter()),
1209                }))
1210            }
1211            Execution::ExplainPushdown {
1212                imports,
1213                determination,
1214            } => {
1215                // # From peek_explain_pushdown
1216
1217                let as_of = determination.timestamp_context.antichain();
1218                let mz_now = determination
1219                    .timestamp_context
1220                    .timestamp()
1221                    .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1222                    .unwrap_or_else(ResultSpec::value_all);
1223
1224                Ok(Some(
1225                    coord::sequencer::explain_pushdown_future_inner(
1226                        session,
1227                        &*catalog,
1228                        &self.storage_collections,
1229                        as_of,
1230                        mz_now,
1231                        imports,
1232                    )
1233                    .await
1234                    .await?,
1235                ))
1236            }
1237            Execution::Peek {
1238                global_lir_plan,
1239                optimization_finished_at: _optimization_finished_at,
1240                plan_insights_optimizer_trace,
1241                finishing,
1242                copy_to,
1243                insights_ctx,
1244            } => {
1245                // Continue with normal execution
1246                // # From peek_finish
1247
1248                // The typ here was generated from the HIR SQL type and simply stored in LIR.
1249                let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
1250
1251                coord::sequencer::emit_optimizer_notices(
1252                    &*catalog,
1253                    session,
1254                    &df_meta.optimizer_notices,
1255                );
1256
1257                // Generate plan insights notice if needed
1258                if let Some(trace) = plan_insights_optimizer_trace {
1259                    let target_cluster = catalog.get_cluster(target_cluster_id);
1260                    let features = OptimizerFeatures::from(catalog.system_config())
1261                        .override_from(&target_cluster.config.features());
1262                    let insights = trace
1263                        .into_plan_insights(
1264                            &features,
1265                            &catalog.for_session(session),
1266                            Some(finishing.clone()),
1267                            Some(target_cluster),
1268                            df_meta.clone(),
1269                            insights_ctx,
1270                        )
1271                        .await?;
1272                    session.add_notice(AdapterNotice::PlanInsights(insights));
1273                }
1274
1275                // # Now back to peek_finish
1276
1277                let watch_set = statement_logging_id.map(|logging_id| {
1278                    WatchSetCreation::new(
1279                        logging_id,
1280                        catalog.state(),
1281                        &input_id_bundle,
1282                        determination.timestamp_context.timestamp_or_default(),
1283                    )
1284                });
1285
1286                let max_result_size = catalog.system_config().max_result_size();
1287
1288                // Clone determination if we need it for emit_timestamp_notice, since it may be
1289                // moved into Command::ExecuteSlowPathPeek.
1290                let determination_for_notice = if session.vars().emit_timestamp_notice() {
1291                    Some(determination.clone())
1292                } else {
1293                    None
1294                };
1295
1296                let response = match peek_plan {
1297                    PeekPlan::FastPath(fast_path_plan) => {
1298                        if let Some(logging_id) = &statement_logging_id {
1299                            // TODO(peek-seq): Actually, we should log it also for
1300                            // FastPathPlan::Constant. The only reason we are not doing so at the
1301                            // moment is to match the old peek sequencing, so that statement logging
1302                            // tests pass with the frontend peek sequencing turned both on and off.
1303                            //
1304                            // When the old sequencing is removed, we should make a couple of
1305                            // changes in how we log timestamps:
1306                            // - Move this up to just after timestamp determination, so that it
1307                            //   appears in the log as soon as possible.
1308                            // - Do it also for Constant peeks.
1309                            // - Currently, slow-path peeks' timestamp logging is done by
1310                            //   `implement_peek_plan`. We could remove it from there, and just do
1311                            //   it here.
1312                            if !matches!(fast_path_plan, FastPathPlan::Constant(..)) {
1313                                self.log_set_timestamp(
1314                                    *logging_id,
1315                                    determination.timestamp_context.timestamp_or_default(),
1316                                );
1317                            }
1318                        }
1319
1320                        let row_set_finishing_seconds =
1321                            session.metrics().row_set_finishing_seconds().clone();
1322
1323                        let peek_stash_read_batch_size_bytes =
1324                            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
1325                                .get(catalog.system_config().dyncfgs());
1326                        let peek_stash_read_memory_budget_bytes =
1327                            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
1328                                .get(catalog.system_config().dyncfgs());
1329
1330                        self.implement_fast_path_peek_plan(
1331                            fast_path_plan,
1332                            determination.timestamp_context.timestamp_or_default(),
1333                            finishing,
1334                            target_cluster_id,
1335                            target_replica,
1336                            typ,
1337                            max_result_size,
1338                            max_query_result_size,
1339                            row_set_finishing_seconds,
1340                            read_holds,
1341                            peek_stash_read_batch_size_bytes,
1342                            peek_stash_read_memory_budget_bytes,
1343                            session.conn_id().clone(),
1344                            source_ids,
1345                            watch_set,
1346                        )
1347                        .await?
1348                    }
1349                    PeekPlan::SlowPath(dataflow_plan) => {
1350                        if let Some(logging_id) = &statement_logging_id {
1351                            self.log_set_transient_index_id(*logging_id, dataflow_plan.id);
1352                        }
1353
1354                        self.call_coordinator(|tx| Command::ExecuteSlowPathPeek {
1355                            dataflow_plan: Box::new(dataflow_plan),
1356                            determination,
1357                            finishing,
1358                            compute_instance: target_cluster_id,
1359                            target_replica,
1360                            intermediate_result_type: typ,
1361                            source_ids,
1362                            conn_id: session.conn_id().clone(),
1363                            max_result_size,
1364                            max_query_result_size,
1365                            watch_set,
1366                            tx,
1367                        })
1368                        .await?
1369                    }
1370                };
1371
1372                // Add timestamp notice if emit_timestamp_notice is enabled
1373                if let Some(determination) = determination_for_notice {
1374                    let explanation = self
1375                        .call_coordinator(|tx| Command::ExplainTimestamp {
1376                            conn_id: session.conn_id().clone(),
1377                            session_wall_time: session.pcx().wall_time,
1378                            cluster_id: target_cluster_id,
1379                            id_bundle: input_id_bundle.clone(),
1380                            determination,
1381                            tx,
1382                        })
1383                        .await;
1384                    session.add_notice(AdapterNotice::QueryTimestamp { explanation });
1385                }
1386
1387                Ok(Some(match copy_to {
1388                    None => response,
1389                    // COPY TO STDOUT
1390                    Some(format) => ExecuteResponse::CopyTo {
1391                        format,
1392                        resp: Box::new(response),
1393                    },
1394                }))
1395            }
1396            Execution::Subscribe {
1397                subscribe_plan,
1398                df_desc,
1399                df_meta,
1400                optimization_finished_at: _optimization_finished_at,
1401            } => {
1402                if df_desc.as_of.as_ref().expect("as of set") == &df_desc.until {
1403                    session.add_notice(AdapterNotice::EqualSubscribeBounds {
1404                        bound: *df_desc.until.as_option().expect("as of set"),
1405                    });
1406                }
1407                coord::sequencer::emit_optimizer_notices(
1408                    &*catalog,
1409                    session,
1410                    &df_meta.optimizer_notices,
1411                );
1412
1413                let response = self
1414                    .call_coordinator(|tx| Command::ExecuteSubscribe {
1415                        df_desc,
1416                        dependency_ids: subscribe_plan.from.depends_on(),
1417                        cluster_id: target_cluster_id,
1418                        replica_id: target_replica,
1419                        conn_id: session.conn_id().clone(),
1420                        session_uuid: session.uuid(),
1421                        read_holds,
1422                        plan: subscribe_plan,
1423                        statement_logging_id,
1424                        tx,
1425                    })
1426                    .await?;
1427                Ok(Some(response))
1428            }
1429            Execution::CopyToS3 {
1430                global_lir_plan,
1431                source_ids,
1432            } => {
1433                let (df_desc, df_meta) = global_lir_plan.unapply();
1434
1435                coord::sequencer::emit_optimizer_notices(
1436                    &*catalog,
1437                    session,
1438                    &df_meta.optimizer_notices,
1439                );
1440
1441                // Extract S3 sink connection info for preflight check
1442                let sink_id = df_desc.sink_id();
1443                let sinks = &df_desc.sink_exports;
1444                if sinks.len() != 1 {
1445                    return Err(AdapterError::Internal(
1446                        "expected exactly one copy to s3 sink".into(),
1447                    ));
1448                }
1449                let (_, sink_desc) = sinks
1450                    .first_key_value()
1451                    .expect("known to be exactly one copy to s3 sink");
1452                let s3_sink_connection = match &sink_desc.connection {
1453                    mz_compute_types::sinks::ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1454                        conn.clone()
1455                    }
1456                    _ => {
1457                        return Err(AdapterError::Internal(
1458                            "expected copy to s3 oneshot sink".into(),
1459                        ));
1460                    }
1461                };
1462
1463                // Perform S3 preflight check in background task (via coordinator).
1464                // This runs slow S3 operations without blocking the coordinator's main task.
1465                self.call_coordinator(|tx| Command::CopyToPreflight {
1466                    s3_sink_connection,
1467                    sink_id,
1468                    tx,
1469                })
1470                .await?;
1471
1472                // Preflight succeeded, now execute the actual COPY TO dataflow
1473                let watch_set = statement_logging_id.map(|logging_id| {
1474                    WatchSetCreation::new(
1475                        logging_id,
1476                        catalog.state(),
1477                        &input_id_bundle,
1478                        determination.timestamp_context.timestamp_or_default(),
1479                    )
1480                });
1481
1482                let response = self
1483                    .call_coordinator(|tx| Command::ExecuteCopyTo {
1484                        df_desc: Box::new(df_desc),
1485                        compute_instance: target_cluster_id,
1486                        target_replica,
1487                        source_ids,
1488                        conn_id: session.conn_id().clone(),
1489                        watch_set,
1490                        tx,
1491                    })
1492                    .await?;
1493
1494                Ok(Some(response))
1495            }
1496        }
1497    }
1498
1499    /// (Similar to Coordinator::determine_timestamp)
1500    /// Determines the timestamp for a query, acquires read holds that ensure the
1501    /// query remains executable at that time, and returns those.
1502    /// The caller is responsible for eventually dropping those read holds.
1503    ///
1504    /// Note: self is taken &mut because of the lazy fetching in `get_compute_instance_client`.
1505    pub(crate) async fn frontend_determine_timestamp(
1506        &mut self,
1507        session: &Session,
1508        id_bundle: &CollectionIdBundle,
1509        when: &QueryWhen,
1510        compute_instance: ComputeInstanceId,
1511        timeline_context: &TimelineContext,
1512        oracle_read_ts: Option<Timestamp>,
1513        real_time_recency_ts: Option<Timestamp>,
1514    ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> {
1515        // this is copy-pasted from Coordinator
1516
1517        let isolation_level = session.vars().transaction_isolation();
1518
1519        let (read_holds, upper) = self
1520            .acquire_read_holds_and_least_valid_write(id_bundle)
1521            .await
1522            .map_err(|err| {
1523                AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1524                    err,
1525                    compute_instance,
1526                )
1527            })?;
1528        let (det, read_holds) = <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1529            session,
1530            id_bundle,
1531            when,
1532            timeline_context,
1533            oracle_read_ts,
1534            real_time_recency_ts,
1535            isolation_level,
1536            read_holds,
1537            upper.clone(),
1538        )?;
1539
1540        session
1541            .metrics()
1542            .determine_timestamp(&[
1543                match det.respond_immediately() {
1544                    true => "true",
1545                    false => "false",
1546                },
1547                isolation_level.as_str(),
1548                &compute_instance.to_string(),
1549            ])
1550            .inc();
1551        if !det.respond_immediately()
1552            && isolation_level == &IsolationLevel::StrictSerializable
1553            && real_time_recency_ts.is_none()
1554        {
1555            // Note down the difference between StrictSerializable and Serializable into a metric.
1556            if let Some(strict) = det.timestamp_context.timestamp() {
1557                let (serializable_det, _tmp_read_holds) =
1558                    <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1559                        session,
1560                        id_bundle,
1561                        when,
1562                        timeline_context,
1563                        oracle_read_ts,
1564                        real_time_recency_ts,
1565                        &IsolationLevel::Serializable,
1566                        read_holds.clone(),
1567                        upper,
1568                    )?;
1569                if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
1570                    session
1571                        .metrics()
1572                        .timestamp_difference_for_strict_serializable_ms(&[compute_instance
1573                            .to_string()
1574                            .as_ref()])
1575                        .observe(f64::cast_lossy(u64::from(
1576                            strict.saturating_sub(*serializable),
1577                        )));
1578                }
1579            }
1580        }
1581
1582        Ok((det, read_holds))
1583    }
1584
1585    fn assert_read_holds_correct(
1586        read_holds: &ReadHolds,
1587        execution: &Execution,
1588        determination: &TimestampDetermination,
1589        target_cluster_id: ClusterId,
1590        in_immediate_multi_stmt_txn: bool,
1591    ) {
1592        // Extract source_imports, index_imports, as_of, and execution_name based on Execution variant
1593        let (source_imports, index_imports, as_of, execution_name): (
1594            Vec<GlobalId>,
1595            Vec<GlobalId>,
1596            Timestamp,
1597            &str,
1598        ) = match execution {
1599            Execution::Peek {
1600                global_lir_plan, ..
1601            } => match global_lir_plan.peek_plan() {
1602                PeekPlan::FastPath(fast_path_plan) => {
1603                    let (sources, indexes) = match fast_path_plan {
1604                        FastPathPlan::Constant(..) => (vec![], vec![]),
1605                        FastPathPlan::PeekExisting(_coll_id, idx_id, ..) => (vec![], vec![*idx_id]),
1606                        FastPathPlan::PeekPersist(global_id, ..) => (vec![*global_id], vec![]),
1607                    };
1608                    (
1609                        sources,
1610                        indexes,
1611                        determination.timestamp_context.timestamp_or_default(),
1612                        "FastPath",
1613                    )
1614                }
1615                PeekPlan::SlowPath(dataflow_plan) => {
1616                    let as_of = dataflow_plan
1617                        .desc
1618                        .as_of
1619                        .clone()
1620                        .expect("dataflow has an as_of")
1621                        .into_element();
1622                    (
1623                        dataflow_plan.desc.source_imports.keys().cloned().collect(),
1624                        dataflow_plan.desc.index_imports.keys().cloned().collect(),
1625                        as_of,
1626                        "SlowPath",
1627                    )
1628                }
1629            },
1630            Execution::CopyToS3 {
1631                global_lir_plan, ..
1632            } => {
1633                let df_desc = global_lir_plan.df_desc();
1634                let as_of = df_desc
1635                    .as_of
1636                    .clone()
1637                    .expect("dataflow has an as_of")
1638                    .into_element();
1639                (
1640                    df_desc.source_imports.keys().cloned().collect(),
1641                    df_desc.index_imports.keys().cloned().collect(),
1642                    as_of,
1643                    "CopyToS3",
1644                )
1645            }
1646            Execution::ExplainPlan { .. } | Execution::ExplainPushdown { .. } => {
1647                // No read holds assertions needed for EXPLAIN variants
1648                return;
1649            }
1650            Execution::Subscribe { df_desc, .. } => {
1651                let as_of = df_desc
1652                    .as_of
1653                    .clone()
1654                    .expect("dataflow has an as_of")
1655                    .into_element();
1656                (
1657                    df_desc.source_imports.keys().cloned().collect(),
1658                    df_desc.index_imports.keys().cloned().collect(),
1659                    as_of,
1660                    "Subscribe",
1661                )
1662            }
1663        };
1664
1665        // Assert that we have some read holds for all the imports of the dataflow.
1666        for id in source_imports.iter() {
1667            soft_assert_or_log!(
1668                read_holds.storage_holds.contains_key(id),
1669                "[{}] missing read hold for the source import {}; (in_immediate_multi_stmt_txn: {})",
1670                execution_name,
1671                id,
1672                in_immediate_multi_stmt_txn,
1673            );
1674        }
1675        for id in index_imports.iter() {
1676            soft_assert_or_log!(
1677                read_holds
1678                    .compute_ids()
1679                    .map(|(_instance, coll)| coll)
1680                    .contains(id),
1681                "[{}] missing read hold for the index import {}; (in_immediate_multi_stmt_txn: {})",
1682                execution_name,
1683                id,
1684                in_immediate_multi_stmt_txn,
1685            );
1686        }
1687
1688        // Also check the holds against the as_of.
1689        for (id, h) in read_holds.storage_holds.iter() {
1690            soft_assert_or_log!(
1691                h.since().less_equal(&as_of),
1692                "[{}] storage read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1693                execution_name,
1694                h.since(),
1695                id,
1696                as_of,
1697                determination,
1698                in_immediate_multi_stmt_txn,
1699            );
1700        }
1701        for ((instance, id), h) in read_holds.compute_holds.iter() {
1702            soft_assert_eq_or_log!(
1703                *instance,
1704                target_cluster_id,
1705                "[{}] the read hold on {} is on the wrong cluster; (in_immediate_multi_stmt_txn: {})",
1706                execution_name,
1707                id,
1708                in_immediate_multi_stmt_txn,
1709            );
1710            soft_assert_or_log!(
1711                h.since().less_equal(&as_of),
1712                "[{}] compute read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1713                execution_name,
1714                h.since(),
1715                id,
1716                as_of,
1717                determination,
1718                in_immediate_multi_stmt_txn,
1719            );
1720        }
1721    }
1722}
1723
1724/// Enum for branching among various execution steps after optimization
1725enum Execution {
1726    Peek {
1727        global_lir_plan: optimize::peek::GlobalLirPlan,
1728        optimization_finished_at: EpochMillis,
1729        plan_insights_optimizer_trace: Option<OptimizerTrace>,
1730        finishing: RowSetFinishing,
1731        copy_to: Option<plan::CopyFormat>,
1732        insights_ctx: Option<Box<PlanInsightsContext>>,
1733    },
1734    Subscribe {
1735        subscribe_plan: SubscribePlan,
1736        df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1737        df_meta: DataflowMetainfo,
1738        optimization_finished_at: EpochMillis,
1739    },
1740    CopyToS3 {
1741        global_lir_plan: optimize::copy_to::GlobalLirPlan,
1742        source_ids: BTreeSet<GlobalId>,
1743    },
1744    ExplainPlan {
1745        df_meta: DataflowMetainfo,
1746        explain_ctx: ExplainPlanContext,
1747        optimizer: optimize::peek::Optimizer,
1748        insights_ctx: Option<Box<PlanInsightsContext>>,
1749    },
1750    ExplainPushdown {
1751        imports: BTreeMap<GlobalId, mz_expr::MapFilterProject>,
1752        determination: TimestampDetermination,
1753    },
1754}