Skip to main content

mz_adapter/
frontend_peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::sync::Arc;
13use std::time::Duration;
14
15use itertools::Itertools;
16use mz_adapter_types::dyncfgs::ENABLE_FRONTEND_SUBSCRIBES;
17use mz_compute_types::ComputeInstanceId;
18use mz_compute_types::dataflows::DataflowDescription;
19use mz_controller_types::ClusterId;
20use mz_expr::{CollectionPlan, ResultSpec, RowSetFinishing};
21use mz_ore::cast::{CastFrom, CastLossy};
22use mz_ore::collections::CollectionExt;
23use mz_ore::now::EpochMillis;
24use mz_ore::task::JoinHandle;
25use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
26use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
27use mz_repr::role_id::RoleId;
28use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
29use mz_sql::ast::Raw;
30use mz_sql::catalog::CatalogCluster;
31use mz_sql::plan::Params;
32use mz_sql::plan::{
33    self, Explainee, ExplaineeStatement, Plan, QueryWhen, SelectPlan, SubscribePlan,
34};
35use mz_sql::rbac;
36use mz_sql::session::metadata::SessionMetadata;
37use mz_sql::session::vars::IsolationLevel;
38use mz_sql_parser::ast::{CopyDirection, ExplainStage, ShowStatement, Statement};
39use mz_transform::EmptyStatisticsOracle;
40use mz_transform::dataflow::DataflowMetainfo;
41use opentelemetry::trace::TraceContextExt;
42use timely::progress::Antichain;
43use tracing::{Span, debug, warn};
44use tracing_opentelemetry::OpenTelemetrySpanExt;
45
46use crate::catalog::Catalog;
47use crate::command::Command;
48use crate::coord::peek::{FastPathPlan, PeekPlan};
49use crate::coord::sequencer::{eval_copy_to_uri, statistics_oracle};
50use crate::coord::timeline::timedomain_for;
51use crate::coord::timestamp_selection::TimestampDetermination;
52use crate::coord::{
53    Coordinator, CopyToContext, ExecuteContextGuard, ExplainContext, ExplainPlanContext,
54    TargetCluster,
55};
56use crate::explain::insights::PlanInsightsContext;
57use crate::explain::optimizer_trace::OptimizerTrace;
58use crate::optimize::Optimize;
59use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
60use crate::session::{Session, TransactionOps, TransactionStatus};
61use crate::statement_logging::WatchSetCreation;
62use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
63use crate::{
64    AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, PeekClient, ReadHolds,
65    TimelineContext, TimestampContext, TimestampProvider, optimize,
66};
67use crate::{coord, metrics};
68
69impl PeekClient {
70    /// Attempt to sequence a peek from the session task.
71    ///
72    /// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the
73    /// Coordinator's sequencing. If it returns an error, it should be returned to the user.
74    ///
75    /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
76    /// triggering the execution of the underlying query.
77    pub(crate) async fn try_frontend_peek(
78        &mut self,
79        portal_name: &str,
80        catalog: Option<Arc<Catalog>>,
81        session: &mut Session,
82        outer_ctx_extra: &mut Option<ExecuteContextGuard>,
83    ) -> Result<Option<ExecuteResponse>, AdapterError> {
84        // # From handle_execute
85
86        if session.vars().emit_trace_id_notice() {
87            let span_context = tracing::Span::current()
88                .context()
89                .span()
90                .span_context()
91                .clone();
92            if span_context.is_valid() {
93                session.add_notice(AdapterNotice::QueryTrace {
94                    trace_id: span_context.trace_id(),
95                });
96            }
97        }
98
99        // TODO(peek-seq): This snapshot is wasted when we end up bailing out from the frontend peek
100        // sequencing. We could solve this is with that optimization where we
101        // continuously keep a catalog snapshot in the session, and only get a new one when the
102        // catalog revision has changed, which we could see with an atomic read.
103        // But anyhow, this problem will just go away when we reach the point that we never fall
104        // back to the old sequencing.
105        let catalog = match catalog {
106            Some(c) => c,
107            None => self.catalog_snapshot("try_frontend_peek").await,
108        };
109
110        // Extract things from the portal.
111        let (stmt, params, logging, lifecycle_timestamps) = {
112            if let Err(err) = Coordinator::verify_portal(&*catalog, session, portal_name) {
113                outer_ctx_extra
114                    .take()
115                    .and_then(|guard| guard.defuse().retire());
116                return Err(err);
117            }
118            let portal = session
119                .get_portal_unverified(portal_name)
120                // The portal is a session-level thing, so it couldn't have concurrently disappeared
121                // since the above verification.
122                .expect("called verify_portal above");
123            let params = portal.parameters.clone();
124            let stmt = portal.stmt.clone();
125            let logging = Arc::clone(&portal.logging);
126            let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
127            (stmt, params, logging, lifecycle_timestamps)
128        };
129
130        // Before planning, check if this is a statement type we can handle.
131        // This must happen BEFORE statement logging setup to avoid orphaned execution records.
132        if let Some(ref stmt) = stmt {
133            match &**stmt {
134                Statement::Select(_)
135                | Statement::ExplainAnalyzeObject(_)
136                | Statement::ExplainAnalyzeCluster(_)
137                | Statement::Show(ShowStatement::ShowObjects(_))
138                | Statement::Show(ShowStatement::ShowColumns(_)) => {
139                    // These are always fine, just continue.
140                    // Note: EXPLAIN ANALYZE will `plan` to `Plan::Select`.
141                    // Note: ShowObjects plans to `Plan::Select`, ShowColumns plans to `Plan::ShowColumns`.
142                    // We handle `Plan::ShowColumns` specially in `try_frontend_peek_inner`.
143                }
144                Statement::ExplainPlan(explain_stmt) => {
145                    // Only handle ExplainPlan for SELECT statements.
146                    // We don't want to handle e.g. EXPLAIN CREATE MATERIALIZED VIEW here, because that
147                    // requires purification before planning, which the frontend peek sequencing doesn't
148                    // do.
149                    match &explain_stmt.explainee {
150                        mz_sql_parser::ast::Explainee::Select(..) => {
151                            // This is a SELECT, continue
152                        }
153                        _ => {
154                            debug!(
155                                "Bailing out from try_frontend_peek, because EXPLAIN is not for a SELECT query"
156                            );
157                            return Ok(None);
158                        }
159                    }
160                }
161                Statement::ExplainPushdown(explain_stmt) => {
162                    // Only handle EXPLAIN FILTER PUSHDOWN for non-BROKEN SELECT statements
163                    match &explain_stmt.explainee {
164                        mz_sql_parser::ast::Explainee::Select(_, false) => {}
165                        _ => {
166                            debug!(
167                                "Bailing out from try_frontend_peek, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is for EXPLAIN BROKEN"
168                            );
169                            return Ok(None);
170                        }
171                    }
172                }
173                Statement::Copy(copy_stmt) => {
174                    match &copy_stmt.direction {
175                        CopyDirection::To => {
176                            // This is COPY TO (...), continue
177                        }
178                        CopyDirection::From => {
179                            debug!(
180                                "Bailing out from try_frontend_peek, because COPY FROM is not supported"
181                            );
182                            return Ok(None);
183                        }
184                    }
185                }
186
187                Statement::Subscribe(_)
188                    if ENABLE_FRONTEND_SUBSCRIBES.get(catalog.system_config().dyncfgs()) =>
189                {
190                    // We have a subscribe statement to process; continue.
191                }
192                _ => {
193                    debug!(
194                        "Bailing out from try_frontend_peek, because statement type is not supported"
195                    );
196                    return Ok(None);
197                }
198            }
199        }
200
201        // Set up statement logging, and log the beginning of execution.
202        // (But only if we're not executing in the context of another statement.)
203        let statement_logging_id = if outer_ctx_extra.is_none() {
204            // This is a new statement, so begin statement logging
205            let result = self.statement_logging_frontend.begin_statement_execution(
206                session,
207                &params,
208                &logging,
209                catalog.system_config(),
210                lifecycle_timestamps,
211            );
212
213            if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result {
214                self.log_began_execution(began_execution, mseh_update, prepared_statement);
215                Some(logging_id)
216            } else {
217                None
218            }
219        } else {
220            // We're executing in the context of another statement (e.g., FETCH),
221            // so extract the statement logging ID from the outer context if present.
222            // We take ownership and retire the outer context here. The end of execution will be
223            // logged in one of the following ways:
224            // - At the end of this function, if the execution is finished by then.
225            // - Later by the Coordinator, either due to RegisterFrontendPeek or ExecuteSlowPathPeek.
226            outer_ctx_extra
227                .take()
228                .and_then(|guard| guard.defuse().retire())
229        };
230
231        let result = self
232            .try_frontend_peek_inner(session, catalog, stmt, params, statement_logging_id)
233            .await;
234
235        // Log the end of execution if we are logging this statement and execution has already
236        // ended.
237        if let Some(logging_id) = statement_logging_id {
238            let reason = match &result {
239                // Streaming results are handled asynchronously by the coordinator
240                Ok(Some(
241                    ExecuteResponse::SendingRowsStreaming { .. }
242                    | ExecuteResponse::Subscribing { .. },
243                )) => {
244                    // Don't log here - the peek or subscribe is still executing.
245                    // It will be logged when handle_peek_notification is called.
246                    return result;
247                }
248                // COPY TO needs to check its inner response
249                Ok(Some(resp @ ExecuteResponse::CopyTo { resp: inner, .. })) => {
250                    match inner.as_ref() {
251                        ExecuteResponse::SendingRowsStreaming { .. }
252                        | ExecuteResponse::Subscribing { .. } => {
253                            // Don't log here - the peek or subscribe is still executing.
254                            // It will be logged when handle_peek_notification is called.
255                            return result;
256                        }
257                        // For non-streaming COPY TO responses, use the outer CopyTo for conversion
258                        _ => resp.into(),
259                    }
260                }
261                // Bailout case, which should not happen
262                Ok(None) => {
263                    soft_panic_or_log!(
264                        "Bailed out from `try_frontend_peek_inner` after we already logged the beginning of statement execution."
265                    );
266                    // This statement will be handled by the old peek sequencing, which will do its
267                    // own statement logging from the beginning. So, let's close out this one.
268                    self.log_ended_execution(
269                        logging_id,
270                        StatementEndedExecutionReason::Errored {
271                            error: "Internal error: bailed out from `try_frontend_peek_inner`"
272                                .to_string(),
273                        },
274                    );
275                    return result;
276                }
277                // All other success responses - use the From implementation
278                // TODO(peek-seq): After we delete the old peek sequencing, we'll be able to adjust
279                // the From implementation to do exactly what we need in the frontend peek
280                // sequencing, so that the above special cases won't be needed.
281                Ok(Some(resp)) => resp.into(),
282                Err(e) => StatementEndedExecutionReason::Errored {
283                    error: e.to_string(),
284                },
285            };
286
287            self.log_ended_execution(logging_id, reason);
288        }
289
290        result
291    }
292
293    /// This is encapsulated in an inner function so that the outer function can still do statement
294    /// logging after the `?` returns of the inner function.
295    async fn try_frontend_peek_inner(
296        &mut self,
297        session: &mut Session,
298        catalog: Arc<Catalog>,
299        stmt: Option<Arc<Statement<Raw>>>,
300        params: Params,
301        statement_logging_id: Option<crate::statement_logging::StatementLoggingId>,
302    ) -> Result<Option<ExecuteResponse>, AdapterError> {
303        let stmt = match stmt {
304            Some(stmt) => stmt,
305            None => {
306                debug!("try_frontend_peek_inner succeeded on an empty query");
307                return Ok(Some(ExecuteResponse::EmptyQuery));
308            }
309        };
310
311        session
312            .metrics()
313            .query_total(&[
314                metrics::session_type_label_value(session.user()),
315                metrics::statement_type_label_value(&stmt),
316            ])
317            .inc();
318
319        // # From handle_execute_inner
320
321        let conn_catalog = catalog.for_session(session);
322        // (`resolved_ids` should be derivable from `stmt`. If `stmt` is later transformed to
323        // remove/add IDs, then `resolved_ids` should be updated to also remove/add those IDs.)
324        let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?;
325
326        let pcx = session.pcx();
327        let (plan, sql_impl_ids) =
328            mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, &params, &resolved_ids)?;
329
330        /// What do we do with the result of the select?
331        enum QueryPlan<'a> {
332            Select(&'a SelectPlan),
333            CopyTo(&'a SelectPlan, CopyToContext),
334            Subscribe(&'a SubscribePlan),
335        }
336
337        let (query_plan, explain_ctx) = match &plan {
338            Plan::Select(select_plan) => {
339                let explain_ctx = if session.vars().emit_plan_insights_notice() {
340                    let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
341                    ExplainContext::PlanInsightsNotice(optimizer_trace)
342                } else {
343                    ExplainContext::None
344                };
345                (QueryPlan::Select(select_plan), explain_ctx)
346            }
347            Plan::ShowColumns(show_columns_plan) => {
348                // ShowColumns wraps a SelectPlan, extract it and proceed as normal.
349                (
350                    QueryPlan::Select(&show_columns_plan.select_plan),
351                    ExplainContext::None,
352                )
353            }
354            Plan::ExplainPlan(plan::ExplainPlanPlan {
355                stage,
356                format,
357                config,
358                explainee: Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc }),
359            }) => {
360                // Create OptimizerTrace to collect optimizer plans
361                let optimizer_trace = OptimizerTrace::new(stage.paths());
362                let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
363                    broken: *broken,
364                    config: config.clone(),
365                    format: *format,
366                    stage: *stage,
367                    replan: None,
368                    desc: Some(desc.clone()),
369                    optimizer_trace,
370                });
371                (QueryPlan::Select(plan), explain_ctx)
372            }
373            // COPY TO S3
374            Plan::CopyTo(plan::CopyToPlan {
375                select_plan,
376                desc,
377                to,
378                connection,
379                connection_id,
380                format,
381                max_file_size,
382            }) => {
383                let uri = eval_copy_to_uri(to.clone(), session, catalog.state())?;
384
385                // (output_batch_count will be set later)
386                let copy_to_ctx = CopyToContext {
387                    desc: desc.clone(),
388                    uri,
389                    connection: connection.clone(),
390                    connection_id: *connection_id,
391                    format: format.clone(),
392                    max_file_size: *max_file_size,
393                    output_batch_count: None,
394                };
395
396                (
397                    QueryPlan::CopyTo(select_plan, copy_to_ctx),
398                    ExplainContext::None,
399                )
400            }
401            Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => {
402                // Only handle EXPLAIN FILTER PUSHDOWN for SELECT statements
403                match explainee {
404                    plan::Explainee::Statement(plan::ExplaineeStatement::Select {
405                        broken: false,
406                        plan,
407                        desc: _,
408                    }) => {
409                        let explain_ctx = ExplainContext::Pushdown;
410                        (QueryPlan::Select(plan), explain_ctx)
411                    }
412                    _ => {
413                        // This shouldn't happen because we already checked for this at the AST
414                        // level before calling `try_frontend_peek_inner`.
415                        soft_panic_or_log!(
416                            "unexpected EXPLAIN FILTER PUSHDOWN plan kind in frontend peek sequencing: {:?}",
417                            explainee
418                        );
419                        debug!(
420                            "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is EXPLAIN BROKEN"
421                        );
422                        return Ok(None);
423                    }
424                }
425            }
426            Plan::SideEffectingFunc(sef_plan) => {
427                // Side-effecting functions need Coordinator state (e.g., active_conns),
428                // so delegate to the Coordinator via a Command.
429                // The RBAC check is performed in the Coordinator where active_conns is available.
430                let response = self
431                    .call_coordinator(|tx| Command::ExecuteSideEffectingFunc {
432                        plan: sef_plan.clone(),
433                        conn_id: session.conn_id().clone(),
434                        current_role: session.role_metadata().current_role,
435                        tx,
436                    })
437                    .await?;
438                return Ok(Some(response));
439            }
440            Plan::Subscribe(subscribe) => (QueryPlan::Subscribe(subscribe), ExplainContext::None),
441            _ => {
442                // This shouldn't happen because we already checked for this at the AST
443                // level before calling `try_frontend_peek_inner`.
444                soft_panic_or_log!(
445                    "Unexpected plan kind in frontend peek sequencing: {:?}",
446                    plan
447                );
448                debug!(
449                    "Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, side-effecting SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO S3"
450                );
451                return Ok(None);
452            }
453        };
454
455        let when = match query_plan {
456            QueryPlan::Select(s) => &s.when,
457            QueryPlan::CopyTo(s, _) => &s.when,
458            QueryPlan::Subscribe(s) => &s.when,
459        };
460
461        let depends_on = match query_plan {
462            QueryPlan::Select(s) => s.source.depends_on(),
463            QueryPlan::CopyTo(s, _) => s.source.depends_on(),
464            QueryPlan::Subscribe(s) => s.from.depends_on(),
465        };
466
467        let contains_temporal = match query_plan {
468            QueryPlan::Select(s) => s.source.contains_temporal(),
469            QueryPlan::CopyTo(s, _) => s.source.contains_temporal(),
470            QueryPlan::Subscribe(s) => Ok(s.from.contains_temporal()),
471        };
472
473        // # From sequence_plan
474
475        // We have checked the plan kind above.
476        assert!(plan.allowed_in_read_only());
477
478        let (cluster, target_cluster_id, target_cluster_name) = {
479            let target_cluster = match session.transaction().cluster() {
480                // Use the current transaction's cluster.
481                Some(cluster_id) => TargetCluster::Transaction(cluster_id),
482                // If there isn't a current cluster set for a transaction, then try to auto route.
483                None => coord::catalog_serving::auto_run_on_catalog_server(
484                    &conn_catalog,
485                    session,
486                    &plan,
487                ),
488            };
489            let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
490            (cluster, cluster.id, &cluster.name)
491        };
492
493        // Log cluster selection
494        if let Some(logging_id) = &statement_logging_id {
495            self.log_set_cluster(*logging_id, target_cluster_id);
496        }
497
498        coord::catalog_serving::check_cluster_restrictions(
499            target_cluster_name.as_str(),
500            &conn_catalog,
501            &plan,
502        )?;
503
504        rbac::check_plan(
505            &conn_catalog,
506            // We can't look at `active_conns` here, but that's ok, because this case was handled
507            // above already inside `Command::ExecuteSideEffectingFunc`.
508            None::<fn(u32) -> Option<RoleId>>,
509            session,
510            &plan,
511            Some(target_cluster_id),
512            &resolved_ids,
513            &sql_impl_ids,
514        )?;
515
516        if let Some((_, wait_future)) =
517            coord::appends::waiting_on_startup_appends(&*catalog, session, &plan)
518        {
519            wait_future.await;
520        }
521
522        let max_query_result_size = Some(session.vars().max_query_result_size());
523
524        // # From sequence_peek
525
526        // # From peek_validate
527
528        let compute_instance_snapshot =
529            ComputeInstanceSnapshot::new_without_collections(cluster.id());
530
531        let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config())
532            .override_from(&catalog.get_cluster(cluster.id()).config.features())
533            .override_from(&explain_ctx);
534
535        if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
536            return Err(AdapterError::NoClusterReplicasAvailable {
537                name: cluster.name.clone(),
538                is_managed: cluster.is_managed(),
539            });
540        }
541
542        let (_, view_id) = self.transient_id_gen.allocate_id();
543        let (_, index_id) = self.transient_id_gen.allocate_id();
544
545        let target_replica_name = session.vars().cluster_replica();
546        let mut target_replica = target_replica_name
547            .map(|name| {
548                cluster
549                    .replica_id(name)
550                    .ok_or(AdapterError::UnknownClusterReplica {
551                        cluster_name: cluster.name.clone(),
552                        replica_name: name.to_string(),
553                    })
554            })
555            .transpose()?;
556
557        let source_ids = depends_on;
558        // TODO(peek-seq): validate_timeline_context can be expensive in real scenarios (not in
559        // simple benchmarks), because it traverses transitive dependencies even of indexed views and
560        // materialized views (also traversing their MIR plans).
561        let mut timeline_context = catalog.validate_timeline_context(source_ids.iter().copied())?;
562        if matches!(timeline_context, TimelineContext::TimestampIndependent) && contains_temporal? {
563            // If the source IDs are timestamp independent but the query contains temporal functions,
564            // then the timeline context needs to be upgraded to timestamp dependent. This is
565            // required because `source_ids` doesn't contain functions.
566            timeline_context = TimelineContext::TimestampDependent;
567        }
568
569        let notices = coord::sequencer::check_log_reads(
570            &catalog,
571            cluster,
572            &source_ids,
573            &mut target_replica,
574            session.vars(),
575        )?;
576        session.add_notices(notices);
577
578        // # From peek_linearize_timestamp
579
580        let isolation_level = session.vars().transaction_isolation().clone();
581        let timeline = Coordinator::get_timeline(&timeline_context);
582        let needs_linearized_read_ts =
583            Coordinator::needs_linearized_read_ts(&isolation_level, when);
584
585        let oracle_read_ts = match timeline {
586            Some(timeline) if needs_linearized_read_ts => {
587                let oracle = self.ensure_oracle(timeline).await?;
588                let oracle_read_ts = oracle.read_ts().await;
589                Some(oracle_read_ts)
590            }
591            Some(_) | None => None,
592        };
593
594        // # From peek_real_time_recency
595
596        let vars = session.vars();
597        let real_time_recency_ts: Option<Timestamp> = if vars.real_time_recency()
598            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
599            && !session.contains_read_timestamp()
600        {
601            // Only call the coordinator when we actually need real-time recency
602            self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp {
603                source_ids: source_ids.clone(),
604                real_time_recency_timeout: *vars.real_time_recency_timeout(),
605                tx,
606            })
607            .await?
608        } else {
609            None
610        };
611
612        // # From peek_timestamp_read_hold
613
614        let dataflow_builder =
615            DataflowBuilder::new(catalog.state(), compute_instance_snapshot.clone());
616        let input_id_bundle = dataflow_builder.sufficient_collections(source_ids.clone());
617
618        // ## From sequence_peek_timestamp
619
620        // Warning: This will be false for AS OF queries, even if we are otherwise inside a
621        // multi-statement transaction. (It's also false for FreshestTableWrite, which is currently
622        // only read-then-write queries, which can't be part of multi-statement transactions, so
623        // FreshestTableWrite doesn't matter.)
624        //
625        // TODO(peek-seq): It's not totally clear to me what the intended semantics are for AS OF
626        // queries inside a transaction: We clearly can't use the transaction timestamp, but the old
627        // peek sequencing still does a timedomain validation. The new peek sequencing does not do
628        // timedomain validation for AS OF queries, which seems more natural. But I'm thinking that
629        // it would be the cleanest to just simply disallow AS OF queries inside transactions.
630        let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when)
631            && !matches!(query_plan, QueryPlan::Subscribe { .. });
632
633        // Fetch or generate a timestamp for this query and fetch or acquire read holds.
634        let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
635            // Use the transaction's timestamp if it exists and this isn't an AS OF query.
636            // (`in_immediate_multi_stmt_txn` is false for AS OF queries.)
637            Some(
638                determination @ TimestampDetermination {
639                    timestamp_context: TimestampContext::TimelineTimestamp { .. },
640                    ..
641                },
642            ) if in_immediate_multi_stmt_txn => {
643                // This is a subsequent (non-AS OF, non-constant) query in a multi-statement
644                // transaction. We now:
645                // - Validate that the query only accesses collections within the transaction's
646                //   timedomain (which we know from the stored read holds).
647                // - Use the transaction's stored timestamp determination.
648                // - Use the (relevant subset of the) transaction's read holds.
649
650                let txn_read_holds_opt = self
651                    .call_coordinator(|tx| Command::GetTransactionReadHoldsBundle {
652                        conn_id: session.conn_id().clone(),
653                        tx,
654                    })
655                    .await;
656
657                if let Some(txn_read_holds) = txn_read_holds_opt {
658                    let allowed_id_bundle = txn_read_holds.id_bundle();
659                    let outside = input_id_bundle.difference(&allowed_id_bundle);
660
661                    // Queries without a timestamp and timeline can belong to any existing timedomain.
662                    if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
663                        let valid_names =
664                            allowed_id_bundle.resolve_names(&*catalog, session.conn_id());
665                        let invalid_names = outside.resolve_names(&*catalog, session.conn_id());
666                        return Err(AdapterError::RelationOutsideTimeDomain {
667                            relations: invalid_names,
668                            names: valid_names,
669                        });
670                    }
671
672                    // Extract the subset of read holds for the collections this query accesses.
673                    let read_holds = txn_read_holds.subset(&input_id_bundle);
674
675                    (determination, read_holds)
676                } else {
677                    // This should never happen: we're in a subsequent query of a multi-statement
678                    // transaction (we have a transaction timestamp), but the coordinator has no
679                    // transaction read holds stored. This indicates a bug in the transaction
680                    // handling.
681                    return Err(AdapterError::Internal(
682                        "Missing transaction read holds for multi-statement transaction"
683                            .to_string(),
684                    ));
685                }
686            }
687            _ => {
688                // There is no timestamp determination yet for this transaction. Either:
689                // - We are not in a multi-statement transaction.
690                // - This is the first (non-AS OF) query in a multi-statement transaction.
691                // - This is an AS OF query.
692                // - This is a constant query (`TimestampContext::NoTimestamp`).
693
694                let timedomain_bundle;
695                let determine_bundle = if in_immediate_multi_stmt_txn {
696                    // This is the first (non-AS OF) query in a multi-statement transaction.
697                    // Determine a timestamp that will be valid for anything in any schema
698                    // referenced by the first query.
699                    timedomain_bundle = timedomain_for(
700                        &*catalog,
701                        &dataflow_builder,
702                        &source_ids,
703                        &timeline_context,
704                        session.conn_id(),
705                        target_cluster_id,
706                    )?;
707                    &timedomain_bundle
708                } else {
709                    // Simply use the inputs of the current query.
710                    &input_id_bundle
711                };
712                let (determination, read_holds) = self
713                    .frontend_determine_timestamp(
714                        session,
715                        determine_bundle,
716                        when,
717                        target_cluster_id,
718                        &timeline_context,
719                        oracle_read_ts,
720                        real_time_recency_ts,
721                    )
722                    .await?;
723
724                // If this is the first (non-AS OF) query in a multi-statement transaction, store
725                // the read holds in the coordinator, so subsequent queries can validate against
726                // them.
727                if in_immediate_multi_stmt_txn {
728                    self.call_coordinator(|tx| Command::StoreTransactionReadHolds {
729                        conn_id: session.conn_id().clone(),
730                        read_holds: read_holds.clone(),
731                        tx,
732                    })
733                    .await;
734                }
735
736                (determination, read_holds)
737            }
738        };
739
740        {
741            // Assert that we have a read hold for all the collections in our `input_id_bundle`.
742            for id in input_id_bundle.iter() {
743                let s = read_holds.storage_holds.contains_key(&id);
744                let c = read_holds
745                    .compute_ids()
746                    .map(|(_instance, coll)| coll)
747                    .contains(&id);
748                soft_assert_or_log!(
749                    s || c,
750                    "missing read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
751                    id,
752                    in_immediate_multi_stmt_txn,
753                );
754            }
755
756            // Assert that each part of the `input_id_bundle` corresponds to the right part of
757            // `read_holds`.
758            for id in input_id_bundle.storage_ids.iter() {
759                soft_assert_or_log!(
760                    read_holds.storage_holds.contains_key(id),
761                    "missing storage read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
762                    id,
763                    in_immediate_multi_stmt_txn,
764                );
765            }
766            for id in input_id_bundle
767                .compute_ids
768                .iter()
769                .flat_map(|(_instance, colls)| colls)
770            {
771                soft_assert_or_log!(
772                    read_holds
773                        .compute_ids()
774                        .map(|(_instance, coll)| coll)
775                        .contains(id),
776                    "missing compute read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
777                    id,
778                    in_immediate_multi_stmt_txn,
779                );
780            }
781        }
782
783        // (TODO(peek-seq): The below TODO is copied from the old peek sequencing. We should resolve
784        // this when we decide what to with `AS OF` in transactions.)
785        // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
786        // arbitrary and we don't recall why we did it (possibly an error!). Change this to always
787        // set the transaction ops. Decide and document what our policy should be on AS OF queries.
788        // Maybe they shouldn't be allowed in transactions at all because it's hard to explain
789        // what's going on there. This should probably get a small design document.
790
791        // We only track the peeks in the session if the query doesn't use AS
792        // OF or we're inside an explicit transaction. The latter case is
793        // necessary to support PG's `BEGIN` semantics, whose behavior can
794        // depend on whether or not reads have occurred in the txn.
795        let requires_linearization = (&explain_ctx).into();
796        let mut transaction_determination = determination.clone();
797        match query_plan {
798            QueryPlan::Subscribe { .. } => {
799                if when.is_transactional() {
800                    session.add_transaction_ops(TransactionOps::Subscribe)?;
801                }
802            }
803            QueryPlan::Select(..) | QueryPlan::CopyTo(..) => {
804                if when.is_transactional() {
805                    session.add_transaction_ops(TransactionOps::Peeks {
806                        determination: transaction_determination,
807                        cluster_id: target_cluster_id,
808                        requires_linearization,
809                    })?;
810                } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
811                    // If the query uses AS OF, then ignore the timestamp.
812                    transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
813                    session.add_transaction_ops(TransactionOps::Peeks {
814                        determination: transaction_determination,
815                        cluster_id: target_cluster_id,
816                        requires_linearization,
817                    })?;
818                }
819            }
820        }
821
822        // # From peek_optimize
823
824        let stats = statistics_oracle(
825            session,
826            &source_ids,
827            &determination.timestamp_context.antichain(),
828            true,
829            catalog.system_config(),
830            &*self.storage_collections,
831        )
832        .await
833        .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
834
835        // Generate data structures that can be moved to another task where we will perform possibly
836        // expensive optimizations.
837        let timestamp_context = determination.timestamp_context.clone();
838        let session_meta = session.meta();
839        let now = catalog.config().now.clone();
840        let target_cluster_name = target_cluster_name.clone();
841        let needs_plan_insights = explain_ctx.needs_plan_insights();
842        let determination_for_pushdown = if matches!(explain_ctx, ExplainContext::Pushdown) {
843            // This is a hairy data structure, so avoid this clone if we are not in
844            // EXPLAIN FILTER PUSHDOWN.
845            Some(determination.clone())
846        } else {
847            None
848        };
849
850        let span = Span::current();
851
852        // Prepare data for plan insights if needed
853        let catalog_for_insights = if needs_plan_insights {
854            Some(Arc::clone(&catalog))
855        } else {
856            None
857        };
858        let mut compute_instances = BTreeMap::new();
859        if needs_plan_insights {
860            for user_cluster in catalog.user_clusters() {
861                let snapshot = ComputeInstanceSnapshot::new_without_collections(user_cluster.id);
862                compute_instances.insert(user_cluster.name.clone(), snapshot);
863            }
864        }
865
866        let source_ids_for_closure = source_ids.clone();
867
868        let optimization_future: JoinHandle<Result<_, AdapterError>> = match query_plan {
869            QueryPlan::CopyTo(select_plan, mut copy_to_ctx) => {
870                let raw_expr = select_plan.source.clone();
871
872                // COPY TO path: calculate output_batch_count and create copy_to optimizer
873                let worker_counts = cluster.replicas().map(|r| {
874                    let loc = &r.config.location;
875                    loc.workers().unwrap_or_else(|| loc.num_processes())
876                });
877                let max_worker_count = match worker_counts.max() {
878                    Some(count) => u64::cast_from(count),
879                    None => {
880                        return Err(AdapterError::NoClusterReplicasAvailable {
881                            name: cluster.name.clone(),
882                            is_managed: cluster.is_managed(),
883                        });
884                    }
885                };
886                copy_to_ctx.output_batch_count = Some(max_worker_count);
887
888                let mut optimizer = optimize::copy_to::Optimizer::new(
889                    Arc::clone(&catalog),
890                    compute_instance_snapshot,
891                    view_id,
892                    copy_to_ctx,
893                    optimizer_config,
894                    self.optimizer_metrics.clone(),
895                );
896
897                mz_ore::task::spawn_blocking(
898                    || "optimize copy-to",
899                    move || {
900                        span.in_scope(|| {
901                            let _dispatch_guard = explain_ctx.dispatch_guard();
902
903                            // COPY TO path
904                            // HIR ⇒ MIR lowering and MIR optimization (local)
905                            let local_mir_plan =
906                                optimizer.catch_unwind_optimize(raw_expr.clone())?;
907                            // Attach resolved context required to continue the pipeline.
908                            let local_mir_plan = local_mir_plan.resolve(
909                                timestamp_context.clone(),
910                                &session_meta,
911                                stats,
912                            );
913                            // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
914                            let global_lir_plan =
915                                optimizer.catch_unwind_optimize(local_mir_plan)?;
916                            Ok(Execution::CopyToS3 {
917                                global_lir_plan,
918                                source_ids: source_ids_for_closure,
919                            })
920                        })
921                    },
922                )
923            }
924            QueryPlan::Select(select_plan) => {
925                let select_plan = select_plan.clone();
926                let raw_expr = select_plan.source.clone();
927
928                // SELECT/EXPLAIN path: create peek optimizer
929                let mut optimizer = optimize::peek::Optimizer::new(
930                    Arc::clone(&catalog),
931                    compute_instance_snapshot,
932                    select_plan.finishing.clone(),
933                    view_id,
934                    index_id,
935                    optimizer_config,
936                    self.optimizer_metrics.clone(),
937                );
938
939                mz_ore::task::spawn_blocking(
940                    || "optimize peek",
941                    move || {
942                        span.in_scope(|| {
943                            let _dispatch_guard = explain_ctx.dispatch_guard();
944
945                            // SELECT/EXPLAIN path
946                            // HIR ⇒ MIR lowering and MIR optimization (local)
947
948                            // The purpose of wrapping the following in a closure is to control where the
949                            // `?`s return from, so that even when a `catch_unwind_optimize` call fails,
950                            // we can still handle `EXPLAIN BROKEN`.
951                            let pipeline = || {
952                                let local_mir_plan =
953                                    optimizer.catch_unwind_optimize(raw_expr.clone())?;
954                                // Attach resolved context required to continue the pipeline.
955                                let local_mir_plan = local_mir_plan.resolve(
956                                    timestamp_context.clone(),
957                                    &session_meta,
958                                    stats,
959                                );
960                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
961                                let global_lir_plan =
962                                    optimizer.catch_unwind_optimize(local_mir_plan)?;
963                                Ok::<_, AdapterError>(global_lir_plan)
964                            };
965
966                            let global_lir_plan_result = pipeline();
967                            let optimization_finished_at = now();
968
969                            let create_insights_ctx =
970                                |optimizer: &optimize::peek::Optimizer,
971                                 is_notice: bool|
972                                 -> Option<Box<PlanInsightsContext>> {
973                                    if !needs_plan_insights {
974                                        return None;
975                                    }
976
977                                    let catalog = catalog_for_insights.as_ref()?;
978
979                                    let enable_re_optimize = if needs_plan_insights {
980                                        // Disable any plan insights that use the optimizer if we only want the
981                                        // notice and plan optimization took longer than the threshold. This is
982                                        // to prevent a situation where optimizing takes a while and there are
983                                        // lots of clusters, which would delay peek execution by the product of
984                                        // those.
985                                        //
986                                        // (This heuristic doesn't work well, see #9492.)
987                                        let dyncfgs = catalog.system_config().dyncfgs();
988                                        let opt_limit = mz_adapter_types::dyncfgs
989                                        ::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
990                                            .get(dyncfgs);
991                                        !(is_notice && optimizer.duration() > opt_limit)
992                                    } else {
993                                        false
994                                    };
995
996                                    Some(Box::new(PlanInsightsContext {
997                                        stmt: select_plan
998                                            .select
999                                            .as_deref()
1000                                            .map(Clone::clone)
1001                                            .map(Statement::Select),
1002                                        raw_expr: raw_expr.clone(),
1003                                        catalog: Arc::clone(catalog),
1004                                        compute_instances,
1005                                        target_instance: target_cluster_name,
1006                                        metrics: optimizer.metrics().clone(),
1007                                        finishing: optimizer.finishing().clone(),
1008                                        optimizer_config: optimizer.config().clone(),
1009                                        session: session_meta,
1010                                        timestamp_context,
1011                                        view_id: optimizer.select_id(),
1012                                        index_id: optimizer.index_id(),
1013                                        enable_re_optimize,
1014                                    }))
1015                                };
1016
1017                            let global_lir_plan = match global_lir_plan_result {
1018                                Ok(plan) => plan,
1019                                Err(err) => {
1020                                    let result = if let ExplainContext::Plan(explain_ctx) =
1021                                        explain_ctx
1022                                        && explain_ctx.broken
1023                                    {
1024                                        // EXPLAIN BROKEN: log error and continue with defaults
1025                                        tracing::error!(
1026                                            "error while handling EXPLAIN statement: {}",
1027                                            err
1028                                        );
1029                                        Ok(Execution::ExplainPlan {
1030                                            df_meta: Default::default(),
1031                                            explain_ctx,
1032                                            optimizer,
1033                                            insights_ctx: None,
1034                                        })
1035                                    } else {
1036                                        Err(err)
1037                                    };
1038                                    return result;
1039                                }
1040                            };
1041
1042                            match explain_ctx {
1043                                ExplainContext::Plan(explain_ctx) => {
1044                                    let (_, df_meta, _) = global_lir_plan.unapply();
1045                                    let insights_ctx = create_insights_ctx(&optimizer, false);
1046                                    Ok(Execution::ExplainPlan {
1047                                        df_meta,
1048                                        explain_ctx,
1049                                        optimizer,
1050                                        insights_ctx,
1051                                    })
1052                                }
1053                                ExplainContext::None => Ok(Execution::Peek {
1054                                    global_lir_plan,
1055                                    optimization_finished_at,
1056                                    plan_insights_optimizer_trace: None,
1057                                    finishing: select_plan.finishing,
1058                                    copy_to: select_plan.copy_to,
1059                                    insights_ctx: None,
1060                                }),
1061                                ExplainContext::PlanInsightsNotice(optimizer_trace) => {
1062                                    let insights_ctx = create_insights_ctx(&optimizer, true);
1063                                    Ok(Execution::Peek {
1064                                        global_lir_plan,
1065                                        optimization_finished_at,
1066                                        plan_insights_optimizer_trace: Some(optimizer_trace),
1067                                        finishing: select_plan.finishing,
1068                                        copy_to: select_plan.copy_to,
1069                                        insights_ctx,
1070                                    })
1071                                }
1072                                ExplainContext::Pushdown => {
1073                                    let (plan, _, _) = global_lir_plan.unapply();
1074                                    let imports = match plan {
1075                                        PeekPlan::SlowPath(plan) => plan
1076                                            .desc
1077                                            .source_imports
1078                                            .into_iter()
1079                                            .filter_map(|(id, import)| {
1080                                                import.desc.arguments.operators.map(|mfp| (id, mfp))
1081                                            })
1082                                            .collect(),
1083                                        PeekPlan::FastPath(_) => {
1084                                            std::collections::BTreeMap::default()
1085                                        }
1086                                    };
1087                                    Ok(Execution::ExplainPushdown {
1088                                        imports,
1089                                        determination: determination_for_pushdown
1090                                            .expect("it's present for the ExplainPushdown case"),
1091                                    })
1092                                }
1093                            }
1094                        })
1095                    },
1096                )
1097            }
1098            QueryPlan::Subscribe(plan) => {
1099                let plan = plan.clone();
1100                let catalog: Arc<Catalog> = Arc::clone(&catalog);
1101                let debug_name = format!("subscribe-{}", index_id);
1102                let mut optimizer = optimize::subscribe::Optimizer::new(
1103                    catalog,
1104                    compute_instance_snapshot.clone(),
1105                    view_id,
1106                    index_id,
1107                    plan.with_snapshot,
1108                    plan.up_to,
1109                    debug_name,
1110                    optimizer_config,
1111                    self.optimizer_metrics.clone(),
1112                );
1113                mz_ore::task::spawn_blocking(
1114                    || "optimize subscribe",
1115                    move || {
1116                        span.in_scope(|| {
1117                            let _dispatch_guard = explain_ctx.dispatch_guard();
1118
1119                            let global_mir_plan = optimizer.catch_unwind_optimize(plan.clone())?;
1120                            let as_of = timestamp_context.timestamp_or_default();
1121
1122                            if let Some(up_to) = optimizer.up_to() {
1123                                if as_of > up_to {
1124                                    return Err(AdapterError::AbsurdSubscribeBounds {
1125                                        as_of,
1126                                        up_to,
1127                                    });
1128                                }
1129                            }
1130                            let local_mir_plan =
1131                                global_mir_plan.resolve(Antichain::from_elem(as_of));
1132
1133                            let global_lir_plan =
1134                                optimizer.catch_unwind_optimize(local_mir_plan)?;
1135                            let optimization_finished_at = now();
1136
1137                            let (df_desc, df_meta) = global_lir_plan.unapply();
1138                            Ok(Execution::Subscribe {
1139                                subscribe_plan: plan,
1140                                df_desc,
1141                                df_meta,
1142                                optimization_finished_at,
1143                            })
1144                        })
1145                    },
1146                )
1147            }
1148        };
1149
1150        let mut optimization_timeout = *session.vars().statement_timeout();
1151        // Timeout of 0 is equivalent to "off", meaning we will wait "forever."
1152        if optimization_timeout == Duration::ZERO {
1153            optimization_timeout = Duration::MAX;
1154        }
1155        let optimization_result =
1156            // Note: spawn_blocking tasks cannot be cancelled, so on timeout we stop waiting but the
1157            // optimization task continues running in the background until completion. See
1158            // https://github.com/MaterializeInc/database-issues/issues/8644 for properly cancelling
1159            // optimizer runs.
1160            match tokio::time::timeout(optimization_timeout, optimization_future).await {
1161                Ok(Ok(result)) => result,
1162                Ok(Err(AdapterError::Optimizer(err))) => {
1163                    return Err(AdapterError::Internal(format!(
1164                        "internal error in optimizer: {}",
1165                        err
1166                    )));
1167                }
1168                Ok(Err(err)) => {
1169                    return Err(err);
1170                }
1171                Err(_elapsed) => {
1172                    warn!("optimize peek timed out after {:?}", optimization_timeout);
1173                    return Err(AdapterError::StatementTimeout);
1174                }
1175            };
1176
1177        // Log optimization finished
1178        if let Some(logging_id) = &statement_logging_id {
1179            self.log_lifecycle_event(*logging_id, StatementLifecycleEvent::OptimizationFinished);
1180        }
1181
1182        // Assert that read holds are correct for the execution plan
1183        Self::assert_read_holds_correct(
1184            &read_holds,
1185            &optimization_result,
1186            &determination,
1187            target_cluster_id,
1188            in_immediate_multi_stmt_txn,
1189        );
1190
1191        // Handle the optimization result: either generate EXPLAIN output or continue with execution
1192        match optimization_result {
1193            Execution::ExplainPlan {
1194                df_meta,
1195                explain_ctx,
1196                optimizer,
1197                insights_ctx,
1198            } => {
1199                let rows = coord::sequencer::explain_plan_inner(
1200                    session,
1201                    &catalog,
1202                    df_meta,
1203                    explain_ctx,
1204                    optimizer,
1205                    insights_ctx,
1206                )
1207                .await?;
1208
1209                Ok(Some(ExecuteResponse::SendingRowsImmediate {
1210                    rows: Box::new(rows.into_row_iter()),
1211                }))
1212            }
1213            Execution::ExplainPushdown {
1214                imports,
1215                determination,
1216            } => {
1217                // # From peek_explain_pushdown
1218
1219                let as_of = determination.timestamp_context.antichain();
1220                let mz_now = determination
1221                    .timestamp_context
1222                    .timestamp()
1223                    .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1224                    .unwrap_or_else(ResultSpec::value_all);
1225
1226                Ok(Some(
1227                    coord::sequencer::explain_pushdown_future_inner(
1228                        session,
1229                        &*catalog,
1230                        &self.storage_collections,
1231                        as_of,
1232                        mz_now,
1233                        imports,
1234                    )
1235                    .await
1236                    .await?,
1237                ))
1238            }
1239            Execution::Peek {
1240                global_lir_plan,
1241                optimization_finished_at: _optimization_finished_at,
1242                plan_insights_optimizer_trace,
1243                finishing,
1244                copy_to,
1245                insights_ctx,
1246            } => {
1247                // Continue with normal execution
1248                // # From peek_finish
1249
1250                // The typ here was generated from the HIR SQL type and simply stored in LIR.
1251                let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
1252
1253                coord::sequencer::emit_optimizer_notices(
1254                    &*catalog,
1255                    session,
1256                    &df_meta.optimizer_notices,
1257                );
1258
1259                // Generate plan insights notice if needed
1260                if let Some(trace) = plan_insights_optimizer_trace {
1261                    let target_cluster = catalog.get_cluster(target_cluster_id);
1262                    let features = OptimizerFeatures::from(catalog.system_config())
1263                        .override_from(&target_cluster.config.features());
1264                    let insights = trace
1265                        .into_plan_insights(
1266                            &features,
1267                            &catalog.for_session(session),
1268                            Some(finishing.clone()),
1269                            Some(target_cluster),
1270                            df_meta.clone(),
1271                            insights_ctx,
1272                        )
1273                        .await?;
1274                    session.add_notice(AdapterNotice::PlanInsights(insights));
1275                }
1276
1277                // # Now back to peek_finish
1278
1279                let watch_set = statement_logging_id.map(|logging_id| {
1280                    WatchSetCreation::new(
1281                        logging_id,
1282                        catalog.state(),
1283                        &input_id_bundle,
1284                        determination.timestamp_context.timestamp_or_default(),
1285                    )
1286                });
1287
1288                let max_result_size = catalog.system_config().max_result_size();
1289
1290                // Clone determination if we need it for emit_timestamp_notice, since it may be
1291                // moved into Command::ExecuteSlowPathPeek.
1292                let determination_for_notice = if session.vars().emit_timestamp_notice() {
1293                    Some(determination.clone())
1294                } else {
1295                    None
1296                };
1297
1298                let response = match peek_plan {
1299                    PeekPlan::FastPath(fast_path_plan) => {
1300                        if let Some(logging_id) = &statement_logging_id {
1301                            // TODO(peek-seq): Actually, we should log it also for
1302                            // FastPathPlan::Constant. The only reason we are not doing so at the
1303                            // moment is to match the old peek sequencing, so that statement logging
1304                            // tests pass with the frontend peek sequencing turned both on and off.
1305                            //
1306                            // When the old sequencing is removed, we should make a couple of
1307                            // changes in how we log timestamps:
1308                            // - Move this up to just after timestamp determination, so that it
1309                            //   appears in the log as soon as possible.
1310                            // - Do it also for Constant peeks.
1311                            // - Currently, slow-path peeks' timestamp logging is done by
1312                            //   `implement_peek_plan`. We could remove it from there, and just do
1313                            //   it here.
1314                            if !matches!(fast_path_plan, FastPathPlan::Constant(..)) {
1315                                self.log_set_timestamp(
1316                                    *logging_id,
1317                                    determination.timestamp_context.timestamp_or_default(),
1318                                );
1319                            }
1320                        }
1321
1322                        let row_set_finishing_seconds =
1323                            session.metrics().row_set_finishing_seconds().clone();
1324
1325                        let peek_stash_read_batch_size_bytes =
1326                            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
1327                                .get(catalog.system_config().dyncfgs());
1328                        let peek_stash_read_memory_budget_bytes =
1329                            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
1330                                .get(catalog.system_config().dyncfgs());
1331
1332                        self.implement_fast_path_peek_plan(
1333                            fast_path_plan,
1334                            determination.timestamp_context.timestamp_or_default(),
1335                            finishing,
1336                            target_cluster_id,
1337                            target_replica,
1338                            typ,
1339                            max_result_size,
1340                            max_query_result_size,
1341                            row_set_finishing_seconds,
1342                            read_holds,
1343                            peek_stash_read_batch_size_bytes,
1344                            peek_stash_read_memory_budget_bytes,
1345                            session.conn_id().clone(),
1346                            source_ids,
1347                            watch_set,
1348                        )
1349                        .await?
1350                    }
1351                    PeekPlan::SlowPath(dataflow_plan) => {
1352                        if let Some(logging_id) = &statement_logging_id {
1353                            self.log_set_transient_index_id(*logging_id, dataflow_plan.id);
1354                        }
1355
1356                        self.call_coordinator(|tx| Command::ExecuteSlowPathPeek {
1357                            dataflow_plan: Box::new(dataflow_plan),
1358                            determination,
1359                            finishing,
1360                            compute_instance: target_cluster_id,
1361                            target_replica,
1362                            intermediate_result_type: typ,
1363                            source_ids,
1364                            conn_id: session.conn_id().clone(),
1365                            max_result_size,
1366                            max_query_result_size,
1367                            watch_set,
1368                            tx,
1369                        })
1370                        .await?
1371                    }
1372                };
1373
1374                // Add timestamp notice if emit_timestamp_notice is enabled
1375                if let Some(determination) = determination_for_notice {
1376                    let explanation = self
1377                        .call_coordinator(|tx| Command::ExplainTimestamp {
1378                            conn_id: session.conn_id().clone(),
1379                            session_wall_time: session.pcx().wall_time,
1380                            cluster_id: target_cluster_id,
1381                            id_bundle: input_id_bundle.clone(),
1382                            determination,
1383                            tx,
1384                        })
1385                        .await;
1386                    session.add_notice(AdapterNotice::QueryTimestamp { explanation });
1387                }
1388
1389                Ok(Some(match copy_to {
1390                    None => response,
1391                    // COPY TO STDOUT
1392                    Some(format) => ExecuteResponse::CopyTo {
1393                        format,
1394                        resp: Box::new(response),
1395                    },
1396                }))
1397            }
1398            Execution::Subscribe {
1399                subscribe_plan,
1400                df_desc,
1401                df_meta,
1402                optimization_finished_at: _optimization_finished_at,
1403            } => {
1404                if df_desc.as_of.as_ref().expect("as of set") == &df_desc.until {
1405                    session.add_notice(AdapterNotice::EqualSubscribeBounds {
1406                        bound: *df_desc.until.as_option().expect("as of set"),
1407                    });
1408                }
1409                coord::sequencer::emit_optimizer_notices(
1410                    &*catalog,
1411                    session,
1412                    &df_meta.optimizer_notices,
1413                );
1414
1415                let response = self
1416                    .call_coordinator(|tx| Command::ExecuteSubscribe {
1417                        df_desc,
1418                        dependency_ids: subscribe_plan.from.depends_on(),
1419                        cluster_id: target_cluster_id,
1420                        replica_id: target_replica,
1421                        conn_id: session.conn_id().clone(),
1422                        session_uuid: session.uuid(),
1423                        read_holds,
1424                        plan: subscribe_plan,
1425                        statement_logging_id,
1426                        tx,
1427                    })
1428                    .await?;
1429                Ok(Some(response))
1430            }
1431            Execution::CopyToS3 {
1432                global_lir_plan,
1433                source_ids,
1434            } => {
1435                let (df_desc, df_meta) = global_lir_plan.unapply();
1436
1437                coord::sequencer::emit_optimizer_notices(
1438                    &*catalog,
1439                    session,
1440                    &df_meta.optimizer_notices,
1441                );
1442
1443                // Extract S3 sink connection info for preflight check
1444                let sink_id = df_desc.sink_id();
1445                let sinks = &df_desc.sink_exports;
1446                if sinks.len() != 1 {
1447                    return Err(AdapterError::Internal(
1448                        "expected exactly one copy to s3 sink".into(),
1449                    ));
1450                }
1451                let (_, sink_desc) = sinks
1452                    .first_key_value()
1453                    .expect("known to be exactly one copy to s3 sink");
1454                let s3_sink_connection = match &sink_desc.connection {
1455                    mz_compute_types::sinks::ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1456                        conn.clone()
1457                    }
1458                    _ => {
1459                        return Err(AdapterError::Internal(
1460                            "expected copy to s3 oneshot sink".into(),
1461                        ));
1462                    }
1463                };
1464
1465                // Perform S3 preflight check in background task (via coordinator).
1466                // This runs slow S3 operations without blocking the coordinator's main task.
1467                self.call_coordinator(|tx| Command::CopyToPreflight {
1468                    s3_sink_connection,
1469                    sink_id,
1470                    tx,
1471                })
1472                .await?;
1473
1474                // Preflight succeeded, now execute the actual COPY TO dataflow
1475                let watch_set = statement_logging_id.map(|logging_id| {
1476                    WatchSetCreation::new(
1477                        logging_id,
1478                        catalog.state(),
1479                        &input_id_bundle,
1480                        determination.timestamp_context.timestamp_or_default(),
1481                    )
1482                });
1483
1484                let response = self
1485                    .call_coordinator(|tx| Command::ExecuteCopyTo {
1486                        df_desc: Box::new(df_desc),
1487                        compute_instance: target_cluster_id,
1488                        target_replica,
1489                        source_ids,
1490                        conn_id: session.conn_id().clone(),
1491                        watch_set,
1492                        tx,
1493                    })
1494                    .await?;
1495
1496                Ok(Some(response))
1497            }
1498        }
1499    }
1500
1501    /// (Similar to Coordinator::determine_timestamp)
1502    /// Determines the timestamp for a query, acquires read holds that ensure the
1503    /// query remains executable at that time, and returns those.
1504    /// The caller is responsible for eventually dropping those read holds.
1505    ///
1506    /// Note: self is taken &mut because of the lazy fetching in `get_compute_instance_client`.
1507    pub(crate) async fn frontend_determine_timestamp(
1508        &mut self,
1509        session: &Session,
1510        id_bundle: &CollectionIdBundle,
1511        when: &QueryWhen,
1512        compute_instance: ComputeInstanceId,
1513        timeline_context: &TimelineContext,
1514        oracle_read_ts: Option<Timestamp>,
1515        real_time_recency_ts: Option<Timestamp>,
1516    ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> {
1517        // this is copy-pasted from Coordinator
1518
1519        let isolation_level = session.vars().transaction_isolation();
1520
1521        let (read_holds, upper) = self
1522            .acquire_read_holds_and_least_valid_write(id_bundle)
1523            .await
1524            .map_err(|err| {
1525                AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1526                    err,
1527                    compute_instance,
1528                )
1529            })?;
1530        let (det, read_holds) = <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1531            session,
1532            id_bundle,
1533            when,
1534            timeline_context,
1535            oracle_read_ts,
1536            real_time_recency_ts,
1537            isolation_level,
1538            read_holds,
1539            upper.clone(),
1540        )?;
1541
1542        session
1543            .metrics()
1544            .determine_timestamp(&[
1545                match det.respond_immediately() {
1546                    true => "true",
1547                    false => "false",
1548                },
1549                isolation_level.as_str(),
1550                &compute_instance.to_string(),
1551            ])
1552            .inc();
1553        if !det.respond_immediately()
1554            && isolation_level == &IsolationLevel::StrictSerializable
1555            && real_time_recency_ts.is_none()
1556        {
1557            // Note down the difference between StrictSerializable and Serializable into a metric.
1558            if let Some(strict) = det.timestamp_context.timestamp() {
1559                let (serializable_det, _tmp_read_holds) =
1560                    <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1561                        session,
1562                        id_bundle,
1563                        when,
1564                        timeline_context,
1565                        oracle_read_ts,
1566                        real_time_recency_ts,
1567                        &IsolationLevel::Serializable,
1568                        read_holds.clone(),
1569                        upper,
1570                    )?;
1571                if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
1572                    session
1573                        .metrics()
1574                        .timestamp_difference_for_strict_serializable_ms(&[compute_instance
1575                            .to_string()
1576                            .as_ref()])
1577                        .observe(f64::cast_lossy(u64::from(
1578                            strict.saturating_sub(*serializable),
1579                        )));
1580                }
1581            }
1582        }
1583
1584        Ok((det, read_holds))
1585    }
1586
1587    fn assert_read_holds_correct(
1588        read_holds: &ReadHolds,
1589        execution: &Execution,
1590        determination: &TimestampDetermination,
1591        target_cluster_id: ClusterId,
1592        in_immediate_multi_stmt_txn: bool,
1593    ) {
1594        // Extract source_imports, index_imports, as_of, and execution_name based on Execution variant
1595        let (source_imports, index_imports, as_of, execution_name): (
1596            Vec<GlobalId>,
1597            Vec<GlobalId>,
1598            Timestamp,
1599            &str,
1600        ) = match execution {
1601            Execution::Peek {
1602                global_lir_plan, ..
1603            } => match global_lir_plan.peek_plan() {
1604                PeekPlan::FastPath(fast_path_plan) => {
1605                    let (sources, indexes) = match fast_path_plan {
1606                        FastPathPlan::Constant(..) => (vec![], vec![]),
1607                        FastPathPlan::PeekExisting(_coll_id, idx_id, ..) => (vec![], vec![*idx_id]),
1608                        FastPathPlan::PeekPersist(global_id, ..) => (vec![*global_id], vec![]),
1609                    };
1610                    (
1611                        sources,
1612                        indexes,
1613                        determination.timestamp_context.timestamp_or_default(),
1614                        "FastPath",
1615                    )
1616                }
1617                PeekPlan::SlowPath(dataflow_plan) => {
1618                    let as_of = dataflow_plan
1619                        .desc
1620                        .as_of
1621                        .clone()
1622                        .expect("dataflow has an as_of")
1623                        .into_element();
1624                    (
1625                        dataflow_plan.desc.source_imports.keys().cloned().collect(),
1626                        dataflow_plan.desc.index_imports.keys().cloned().collect(),
1627                        as_of,
1628                        "SlowPath",
1629                    )
1630                }
1631            },
1632            Execution::CopyToS3 {
1633                global_lir_plan, ..
1634            } => {
1635                let df_desc = global_lir_plan.df_desc();
1636                let as_of = df_desc
1637                    .as_of
1638                    .clone()
1639                    .expect("dataflow has an as_of")
1640                    .into_element();
1641                (
1642                    df_desc.source_imports.keys().cloned().collect(),
1643                    df_desc.index_imports.keys().cloned().collect(),
1644                    as_of,
1645                    "CopyToS3",
1646                )
1647            }
1648            Execution::ExplainPlan { .. } | Execution::ExplainPushdown { .. } => {
1649                // No read holds assertions needed for EXPLAIN variants
1650                return;
1651            }
1652            Execution::Subscribe { df_desc, .. } => {
1653                let as_of = df_desc
1654                    .as_of
1655                    .clone()
1656                    .expect("dataflow has an as_of")
1657                    .into_element();
1658                (
1659                    df_desc.source_imports.keys().cloned().collect(),
1660                    df_desc.index_imports.keys().cloned().collect(),
1661                    as_of,
1662                    "Subscribe",
1663                )
1664            }
1665        };
1666
1667        // Assert that we have some read holds for all the imports of the dataflow.
1668        for id in source_imports.iter() {
1669            soft_assert_or_log!(
1670                read_holds.storage_holds.contains_key(id),
1671                "[{}] missing read hold for the source import {}; (in_immediate_multi_stmt_txn: {})",
1672                execution_name,
1673                id,
1674                in_immediate_multi_stmt_txn,
1675            );
1676        }
1677        for id in index_imports.iter() {
1678            soft_assert_or_log!(
1679                read_holds
1680                    .compute_ids()
1681                    .map(|(_instance, coll)| coll)
1682                    .contains(id),
1683                "[{}] missing read hold for the index import {}; (in_immediate_multi_stmt_txn: {})",
1684                execution_name,
1685                id,
1686                in_immediate_multi_stmt_txn,
1687            );
1688        }
1689
1690        // Also check the holds against the as_of.
1691        for (id, h) in read_holds.storage_holds.iter() {
1692            soft_assert_or_log!(
1693                h.since().less_equal(&as_of),
1694                "[{}] storage read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1695                execution_name,
1696                h.since(),
1697                id,
1698                as_of,
1699                determination,
1700                in_immediate_multi_stmt_txn,
1701            );
1702        }
1703        for ((instance, id), h) in read_holds.compute_holds.iter() {
1704            soft_assert_eq_or_log!(
1705                *instance,
1706                target_cluster_id,
1707                "[{}] the read hold on {} is on the wrong cluster; (in_immediate_multi_stmt_txn: {})",
1708                execution_name,
1709                id,
1710                in_immediate_multi_stmt_txn,
1711            );
1712            soft_assert_or_log!(
1713                h.since().less_equal(&as_of),
1714                "[{}] compute read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1715                execution_name,
1716                h.since(),
1717                id,
1718                as_of,
1719                determination,
1720                in_immediate_multi_stmt_txn,
1721            );
1722        }
1723    }
1724}
1725
1726/// Enum for branching among various execution steps after optimization
1727enum Execution {
1728    Peek {
1729        global_lir_plan: optimize::peek::GlobalLirPlan,
1730        optimization_finished_at: EpochMillis,
1731        plan_insights_optimizer_trace: Option<OptimizerTrace>,
1732        finishing: RowSetFinishing,
1733        copy_to: Option<plan::CopyFormat>,
1734        insights_ctx: Option<Box<PlanInsightsContext>>,
1735    },
1736    Subscribe {
1737        subscribe_plan: SubscribePlan,
1738        df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1739        df_meta: DataflowMetainfo,
1740        optimization_finished_at: EpochMillis,
1741    },
1742    CopyToS3 {
1743        global_lir_plan: optimize::copy_to::GlobalLirPlan,
1744        source_ids: BTreeSet<GlobalId>,
1745    },
1746    ExplainPlan {
1747        df_meta: DataflowMetainfo,
1748        explain_ctx: ExplainPlanContext,
1749        optimizer: optimize::peek::Optimizer,
1750        insights_ctx: Option<Box<PlanInsightsContext>>,
1751    },
1752    ExplainPushdown {
1753        imports: BTreeMap<GlobalId, mz_expr::MapFilterProject>,
1754        determination: TimestampDetermination,
1755    },
1756}