Skip to main content

mz_adapter/
frontend_peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::sync::Arc;
13
14use itertools::Itertools;
15use mz_adapter_types::dyncfgs::ENABLE_FRONTEND_SUBSCRIBES;
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::dataflows::DataflowDescription;
18use mz_controller_types::ClusterId;
19use mz_expr::{CollectionPlan, ResultSpec, RowSetFinishing};
20use mz_ore::cast::{CastFrom, CastLossy};
21use mz_ore::collections::CollectionExt;
22use mz_ore::now::EpochMillis;
23use mz_ore::task::JoinHandle;
24use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
25use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
26use mz_repr::role_id::RoleId;
27use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
28use mz_sql::ast::Raw;
29use mz_sql::catalog::CatalogCluster;
30use mz_sql::plan::Params;
31use mz_sql::plan::{
32    self, Explainee, ExplaineeStatement, Plan, QueryWhen, SelectPlan, SubscribePlan,
33};
34use mz_sql::rbac;
35use mz_sql::session::metadata::SessionMetadata;
36use mz_sql::session::vars::IsolationLevel;
37use mz_sql_parser::ast::{CopyDirection, ExplainStage, ShowStatement, Statement};
38use mz_transform::EmptyStatisticsOracle;
39use mz_transform::dataflow::DataflowMetainfo;
40use opentelemetry::trace::TraceContextExt;
41use timely::progress::Antichain;
42use tracing::{Span, debug, warn};
43use tracing_opentelemetry::OpenTelemetrySpanExt;
44
45use crate::catalog::Catalog;
46use crate::command::Command;
47use crate::coord::peek::{FastPathPlan, PeekPlan};
48use crate::coord::sequencer::{eval_copy_to_uri, statistics_oracle};
49use crate::coord::timeline::timedomain_for;
50use crate::coord::timestamp_selection::TimestampDetermination;
51use crate::coord::{
52    Coordinator, CopyToContext, ExecuteContextGuard, ExplainContext, ExplainPlanContext,
53    TargetCluster,
54};
55use crate::explain::insights::PlanInsightsContext;
56use crate::explain::optimizer_trace::OptimizerTrace;
57use crate::optimize::Optimize;
58use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
59use crate::session::{Session, TransactionOps, TransactionStatus};
60use crate::statement_logging::WatchSetCreation;
61use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
62use crate::{
63    AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, PeekClient, ReadHolds,
64    TimelineContext, TimestampContext, TimestampProvider, optimize,
65};
66use crate::{coord, metrics};
67
68impl PeekClient {
69    /// Attempt to sequence a peek from the session task.
70    ///
71    /// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the
72    /// Coordinator's sequencing. If it returns an error, it should be returned to the user.
73    ///
74    /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
75    /// triggering the execution of the underlying query.
76    pub(crate) async fn try_frontend_peek(
77        &mut self,
78        portal_name: &str,
79        session: &mut Session,
80        outer_ctx_extra: &mut Option<ExecuteContextGuard>,
81    ) -> Result<Option<ExecuteResponse>, AdapterError> {
82        // # From handle_execute
83
84        if session.vars().emit_trace_id_notice() {
85            let span_context = tracing::Span::current()
86                .context()
87                .span()
88                .span_context()
89                .clone();
90            if span_context.is_valid() {
91                session.add_notice(AdapterNotice::QueryTrace {
92                    trace_id: span_context.trace_id(),
93                });
94            }
95        }
96
97        // TODO(peek-seq): This snapshot is wasted when we end up bailing out from the frontend peek
98        // sequencing. We could solve this is with that optimization where we
99        // continuously keep a catalog snapshot in the session, and only get a new one when the
100        // catalog revision has changed, which we could see with an atomic read.
101        // But anyhow, this problem will just go away when we reach the point that we never fall
102        // back to the old sequencing.
103        let catalog = self.catalog_snapshot("try_frontend_peek").await;
104
105        // Extract things from the portal.
106        let (stmt, params, logging, lifecycle_timestamps) = {
107            if let Err(err) = Coordinator::verify_portal(&*catalog, session, portal_name) {
108                outer_ctx_extra
109                    .take()
110                    .and_then(|guard| guard.defuse().retire());
111                return Err(err);
112            }
113            let portal = session
114                .get_portal_unverified(portal_name)
115                // The portal is a session-level thing, so it couldn't have concurrently disappeared
116                // since the above verification.
117                .expect("called verify_portal above");
118            let params = portal.parameters.clone();
119            let stmt = portal.stmt.clone();
120            let logging = Arc::clone(&portal.logging);
121            let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
122            (stmt, params, logging, lifecycle_timestamps)
123        };
124
125        // Before planning, check if this is a statement type we can handle.
126        // This must happen BEFORE statement logging setup to avoid orphaned execution records.
127        if let Some(ref stmt) = stmt {
128            match &**stmt {
129                Statement::Select(_)
130                | Statement::ExplainAnalyzeObject(_)
131                | Statement::ExplainAnalyzeCluster(_)
132                | Statement::Show(ShowStatement::ShowObjects(_))
133                | Statement::Show(ShowStatement::ShowColumns(_)) => {
134                    // These are always fine, just continue.
135                    // Note: EXPLAIN ANALYZE will `plan` to `Plan::Select`.
136                    // Note: ShowObjects plans to `Plan::Select`, ShowColumns plans to `Plan::ShowColumns`.
137                    // We handle `Plan::ShowColumns` specially in `try_frontend_peek_inner`.
138                }
139                Statement::ExplainPlan(explain_stmt) => {
140                    // Only handle ExplainPlan for SELECT statements.
141                    // We don't want to handle e.g. EXPLAIN CREATE MATERIALIZED VIEW here, because that
142                    // requires purification before planning, which the frontend peek sequencing doesn't
143                    // do.
144                    match &explain_stmt.explainee {
145                        mz_sql_parser::ast::Explainee::Select(..) => {
146                            // This is a SELECT, continue
147                        }
148                        _ => {
149                            debug!(
150                                "Bailing out from try_frontend_peek, because EXPLAIN is not for a SELECT query"
151                            );
152                            return Ok(None);
153                        }
154                    }
155                }
156                Statement::ExplainPushdown(explain_stmt) => {
157                    // Only handle EXPLAIN FILTER PUSHDOWN for non-BROKEN SELECT statements
158                    match &explain_stmt.explainee {
159                        mz_sql_parser::ast::Explainee::Select(_, false) => {}
160                        _ => {
161                            debug!(
162                                "Bailing out from try_frontend_peek, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is for EXPLAIN BROKEN"
163                            );
164                            return Ok(None);
165                        }
166                    }
167                }
168                Statement::Copy(copy_stmt) => {
169                    match &copy_stmt.direction {
170                        CopyDirection::To => {
171                            // This is COPY TO (...), continue
172                        }
173                        CopyDirection::From => {
174                            debug!(
175                                "Bailing out from try_frontend_peek, because COPY FROM is not supported"
176                            );
177                            return Ok(None);
178                        }
179                    }
180                }
181
182                Statement::Subscribe(_)
183                    if ENABLE_FRONTEND_SUBSCRIBES.get(catalog.system_config().dyncfgs()) =>
184                {
185                    // We have a subscribe statement to process; continue.
186                }
187                _ => {
188                    debug!(
189                        "Bailing out from try_frontend_peek, because statement type is not supported"
190                    );
191                    return Ok(None);
192                }
193            }
194        }
195
196        // Set up statement logging, and log the beginning of execution.
197        // (But only if we're not executing in the context of another statement.)
198        let statement_logging_id = if outer_ctx_extra.is_none() {
199            // This is a new statement, so begin statement logging
200            let result = self.statement_logging_frontend.begin_statement_execution(
201                session,
202                &params,
203                &logging,
204                catalog.system_config(),
205                lifecycle_timestamps,
206            );
207
208            if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result {
209                self.log_began_execution(began_execution, mseh_update, prepared_statement);
210                Some(logging_id)
211            } else {
212                None
213            }
214        } else {
215            // We're executing in the context of another statement (e.g., FETCH),
216            // so extract the statement logging ID from the outer context if present.
217            // We take ownership and retire the outer context here. The end of execution will be
218            // logged in one of the following ways:
219            // - At the end of this function, if the execution is finished by then.
220            // - Later by the Coordinator, either due to RegisterFrontendPeek or ExecuteSlowPathPeek.
221            outer_ctx_extra
222                .take()
223                .and_then(|guard| guard.defuse().retire())
224        };
225
226        let result = self
227            .try_frontend_peek_inner(session, catalog, stmt, params, statement_logging_id)
228            .await;
229
230        // Log the end of execution if we are logging this statement and execution has already
231        // ended.
232        if let Some(logging_id) = statement_logging_id {
233            let reason = match &result {
234                // Streaming results are handled asynchronously by the coordinator
235                Ok(Some(
236                    ExecuteResponse::SendingRowsStreaming { .. }
237                    | ExecuteResponse::Subscribing { .. },
238                )) => {
239                    // Don't log here - the peek or subscribe is still executing.
240                    // It will be logged when handle_peek_notification is called.
241                    return result;
242                }
243                // COPY TO needs to check its inner response
244                Ok(Some(resp @ ExecuteResponse::CopyTo { resp: inner, .. })) => {
245                    match inner.as_ref() {
246                        ExecuteResponse::SendingRowsStreaming { .. }
247                        | ExecuteResponse::Subscribing { .. } => {
248                            // Don't log here - the peek or subscribe is still executing.
249                            // It will be logged when handle_peek_notification is called.
250                            return result;
251                        }
252                        // For non-streaming COPY TO responses, use the outer CopyTo for conversion
253                        _ => resp.into(),
254                    }
255                }
256                // Bailout case, which should not happen
257                Ok(None) => {
258                    soft_panic_or_log!(
259                        "Bailed out from `try_frontend_peek_inner` after we already logged the beginning of statement execution."
260                    );
261                    // This statement will be handled by the old peek sequencing, which will do its
262                    // own statement logging from the beginning. So, let's close out this one.
263                    self.log_ended_execution(
264                        logging_id,
265                        StatementEndedExecutionReason::Errored {
266                            error: "Internal error: bailed out from `try_frontend_peek_inner`"
267                                .to_string(),
268                        },
269                    );
270                    return result;
271                }
272                // All other success responses - use the From implementation
273                // TODO(peek-seq): After we delete the old peek sequencing, we'll be able to adjust
274                // the From implementation to do exactly what we need in the frontend peek
275                // sequencing, so that the above special cases won't be needed.
276                Ok(Some(resp)) => resp.into(),
277                Err(e) => StatementEndedExecutionReason::Errored {
278                    error: e.to_string(),
279                },
280            };
281
282            self.log_ended_execution(logging_id, reason);
283        }
284
285        result
286    }
287
288    /// This is encapsulated in an inner function so that the outer function can still do statement
289    /// logging after the `?` returns of the inner function.
290    async fn try_frontend_peek_inner(
291        &mut self,
292        session: &mut Session,
293        catalog: Arc<Catalog>,
294        stmt: Option<Arc<Statement<Raw>>>,
295        params: Params,
296        statement_logging_id: Option<crate::statement_logging::StatementLoggingId>,
297    ) -> Result<Option<ExecuteResponse>, AdapterError> {
298        let stmt = match stmt {
299            Some(stmt) => stmt,
300            None => {
301                debug!("try_frontend_peek_inner succeeded on an empty query");
302                return Ok(Some(ExecuteResponse::EmptyQuery));
303            }
304        };
305
306        session
307            .metrics()
308            .query_total(&[
309                metrics::session_type_label_value(session.user()),
310                metrics::statement_type_label_value(&stmt),
311            ])
312            .inc();
313
314        // # From handle_execute_inner
315
316        let conn_catalog = catalog.for_session(session);
317        // (`resolved_ids` should be derivable from `stmt`. If `stmt` is later transformed to
318        // remove/add IDs, then `resolved_ids` should be updated to also remove/add those IDs.)
319        let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?;
320
321        let pcx = session.pcx();
322        let plan = mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, &params, &resolved_ids)?;
323
324        /// What do we do with the result of the select?
325        enum QueryPlan<'a> {
326            Select(&'a SelectPlan),
327            CopyTo(&'a SelectPlan, CopyToContext),
328            Subscribe(&'a SubscribePlan),
329        }
330
331        let (query_plan, explain_ctx) = match &plan {
332            Plan::Select(select_plan) => {
333                let explain_ctx = if session.vars().emit_plan_insights_notice() {
334                    let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
335                    ExplainContext::PlanInsightsNotice(optimizer_trace)
336                } else {
337                    ExplainContext::None
338                };
339                (QueryPlan::Select(select_plan), explain_ctx)
340            }
341            Plan::ShowColumns(show_columns_plan) => {
342                // ShowColumns wraps a SelectPlan, extract it and proceed as normal.
343                (
344                    QueryPlan::Select(&show_columns_plan.select_plan),
345                    ExplainContext::None,
346                )
347            }
348            Plan::ExplainPlan(plan::ExplainPlanPlan {
349                stage,
350                format,
351                config,
352                explainee: Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc }),
353            }) => {
354                // Create OptimizerTrace to collect optimizer plans
355                let optimizer_trace = OptimizerTrace::new(stage.paths());
356                let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
357                    broken: *broken,
358                    config: config.clone(),
359                    format: *format,
360                    stage: *stage,
361                    replan: None,
362                    desc: Some(desc.clone()),
363                    optimizer_trace,
364                });
365                (QueryPlan::Select(plan), explain_ctx)
366            }
367            // COPY TO S3
368            Plan::CopyTo(plan::CopyToPlan {
369                select_plan,
370                desc,
371                to,
372                connection,
373                connection_id,
374                format,
375                max_file_size,
376            }) => {
377                let uri = eval_copy_to_uri(to.clone(), session, catalog.state())?;
378
379                // (output_batch_count will be set later)
380                let copy_to_ctx = CopyToContext {
381                    desc: desc.clone(),
382                    uri,
383                    connection: connection.clone(),
384                    connection_id: *connection_id,
385                    format: format.clone(),
386                    max_file_size: *max_file_size,
387                    output_batch_count: None,
388                };
389
390                (
391                    QueryPlan::CopyTo(select_plan, copy_to_ctx),
392                    ExplainContext::None,
393                )
394            }
395            Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => {
396                // Only handle EXPLAIN FILTER PUSHDOWN for SELECT statements
397                match explainee {
398                    plan::Explainee::Statement(plan::ExplaineeStatement::Select {
399                        broken: false,
400                        plan,
401                        desc: _,
402                    }) => {
403                        let explain_ctx = ExplainContext::Pushdown;
404                        (QueryPlan::Select(plan), explain_ctx)
405                    }
406                    _ => {
407                        // This shouldn't happen because we already checked for this at the AST
408                        // level before calling `try_frontend_peek_inner`.
409                        soft_panic_or_log!(
410                            "unexpected EXPLAIN FILTER PUSHDOWN plan kind in frontend peek sequencing: {:?}",
411                            explainee
412                        );
413                        debug!(
414                            "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is EXPLAIN BROKEN"
415                        );
416                        return Ok(None);
417                    }
418                }
419            }
420            Plan::SideEffectingFunc(sef_plan) => {
421                // Side-effecting functions need Coordinator state (e.g., active_conns),
422                // so delegate to the Coordinator via a Command.
423                // The RBAC check is performed in the Coordinator where active_conns is available.
424                let response = self
425                    .call_coordinator(|tx| Command::ExecuteSideEffectingFunc {
426                        plan: sef_plan.clone(),
427                        conn_id: session.conn_id().clone(),
428                        current_role: session.role_metadata().current_role,
429                        tx,
430                    })
431                    .await?;
432                return Ok(Some(response));
433            }
434            Plan::Subscribe(subscribe) => (QueryPlan::Subscribe(subscribe), ExplainContext::None),
435            _ => {
436                // This shouldn't happen because we already checked for this at the AST
437                // level before calling `try_frontend_peek_inner`.
438                soft_panic_or_log!(
439                    "Unexpected plan kind in frontend peek sequencing: {:?}",
440                    plan
441                );
442                debug!(
443                    "Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, side-effecting SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO S3"
444                );
445                return Ok(None);
446            }
447        };
448
449        let when = match query_plan {
450            QueryPlan::Select(s) => &s.when,
451            QueryPlan::CopyTo(s, _) => &s.when,
452            QueryPlan::Subscribe(s) => &s.when,
453        };
454
455        let depends_on = match query_plan {
456            QueryPlan::Select(s) => s.source.depends_on(),
457            QueryPlan::CopyTo(s, _) => s.source.depends_on(),
458            QueryPlan::Subscribe(s) => s.from.depends_on(),
459        };
460
461        let contains_temporal = match query_plan {
462            QueryPlan::Select(s) => s.source.contains_temporal(),
463            QueryPlan::CopyTo(s, _) => s.source.contains_temporal(),
464            QueryPlan::Subscribe(s) => Ok(s.from.contains_temporal()),
465        };
466
467        // # From sequence_plan
468
469        // We have checked the plan kind above.
470        assert!(plan.allowed_in_read_only());
471
472        let (cluster, target_cluster_id, target_cluster_name) = {
473            let target_cluster = match session.transaction().cluster() {
474                // Use the current transaction's cluster.
475                Some(cluster_id) => TargetCluster::Transaction(cluster_id),
476                // If there isn't a current cluster set for a transaction, then try to auto route.
477                None => coord::catalog_serving::auto_run_on_catalog_server(
478                    &conn_catalog,
479                    session,
480                    &plan,
481                ),
482            };
483            let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
484            (cluster, cluster.id, &cluster.name)
485        };
486
487        // Log cluster selection
488        if let Some(logging_id) = &statement_logging_id {
489            self.log_set_cluster(*logging_id, target_cluster_id);
490        }
491
492        coord::catalog_serving::check_cluster_restrictions(
493            target_cluster_name.as_str(),
494            &conn_catalog,
495            &plan,
496        )?;
497
498        rbac::check_plan(
499            &conn_catalog,
500            // We can't look at `active_conns` here, but that's ok, because this case was handled
501            // above already inside `Command::ExecuteSideEffectingFunc`.
502            None::<fn(u32) -> Option<RoleId>>,
503            session,
504            &plan,
505            Some(target_cluster_id),
506            &resolved_ids,
507        )?;
508
509        if let Some((_, wait_future)) =
510            coord::appends::waiting_on_startup_appends(&*catalog, session, &plan)
511        {
512            wait_future.await;
513        }
514
515        let max_query_result_size = Some(session.vars().max_query_result_size());
516
517        // # From sequence_peek
518
519        // # From peek_validate
520
521        let compute_instance_snapshot =
522            ComputeInstanceSnapshot::new_without_collections(cluster.id());
523
524        let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config())
525            .override_from(&catalog.get_cluster(cluster.id()).config.features())
526            .override_from(&explain_ctx);
527
528        if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
529            return Err(AdapterError::NoClusterReplicasAvailable {
530                name: cluster.name.clone(),
531                is_managed: cluster.is_managed(),
532            });
533        }
534
535        let (_, view_id) = self.transient_id_gen.allocate_id();
536        let (_, index_id) = self.transient_id_gen.allocate_id();
537
538        let target_replica_name = session.vars().cluster_replica();
539        let mut target_replica = target_replica_name
540            .map(|name| {
541                cluster
542                    .replica_id(name)
543                    .ok_or(AdapterError::UnknownClusterReplica {
544                        cluster_name: cluster.name.clone(),
545                        replica_name: name.to_string(),
546                    })
547            })
548            .transpose()?;
549
550        let source_ids = depends_on;
551        // TODO(peek-seq): validate_timeline_context can be expensive in real scenarios (not in
552        // simple benchmarks), because it traverses transitive dependencies even of indexed views and
553        // materialized views (also traversing their MIR plans).
554        let mut timeline_context = catalog.validate_timeline_context(source_ids.iter().copied())?;
555        if matches!(timeline_context, TimelineContext::TimestampIndependent) && contains_temporal? {
556            // If the source IDs are timestamp independent but the query contains temporal functions,
557            // then the timeline context needs to be upgraded to timestamp dependent. This is
558            // required because `source_ids` doesn't contain functions.
559            timeline_context = TimelineContext::TimestampDependent;
560        }
561
562        let notices = coord::sequencer::check_log_reads(
563            &catalog,
564            cluster,
565            &source_ids,
566            &mut target_replica,
567            session.vars(),
568        )?;
569        session.add_notices(notices);
570
571        // # From peek_linearize_timestamp
572
573        let isolation_level = session.vars().transaction_isolation().clone();
574        let timeline = Coordinator::get_timeline(&timeline_context);
575        let needs_linearized_read_ts =
576            Coordinator::needs_linearized_read_ts(&isolation_level, when);
577
578        let oracle_read_ts = match timeline {
579            Some(timeline) if needs_linearized_read_ts => {
580                let oracle = self.ensure_oracle(timeline).await?;
581                let oracle_read_ts = oracle.read_ts().await;
582                Some(oracle_read_ts)
583            }
584            Some(_) | None => None,
585        };
586
587        // # From peek_real_time_recency
588
589        let vars = session.vars();
590        let real_time_recency_ts: Option<Timestamp> = if vars.real_time_recency()
591            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
592            && !session.contains_read_timestamp()
593        {
594            // Only call the coordinator when we actually need real-time recency
595            self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp {
596                source_ids: source_ids.clone(),
597                real_time_recency_timeout: *vars.real_time_recency_timeout(),
598                tx,
599            })
600            .await?
601        } else {
602            None
603        };
604
605        // # From peek_timestamp_read_hold
606
607        let dataflow_builder =
608            DataflowBuilder::new(catalog.state(), compute_instance_snapshot.clone());
609        let input_id_bundle = dataflow_builder.sufficient_collections(source_ids.clone());
610
611        // ## From sequence_peek_timestamp
612
613        // Warning: This will be false for AS OF queries, even if we are otherwise inside a
614        // multi-statement transaction. (It's also false for FreshestTableWrite, which is currently
615        // only read-then-write queries, which can't be part of multi-statement transactions, so
616        // FreshestTableWrite doesn't matter.)
617        //
618        // TODO(peek-seq): It's not totally clear to me what the intended semantics are for AS OF
619        // queries inside a transaction: We clearly can't use the transaction timestamp, but the old
620        // peek sequencing still does a timedomain validation. The new peek sequencing does not do
621        // timedomain validation for AS OF queries, which seems more natural. But I'm thinking that
622        // it would be the cleanest to just simply disallow AS OF queries inside transactions.
623        let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when)
624            && !matches!(query_plan, QueryPlan::Subscribe { .. });
625
626        // Fetch or generate a timestamp for this query and fetch or acquire read holds.
627        let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
628            // Use the transaction's timestamp if it exists and this isn't an AS OF query.
629            // (`in_immediate_multi_stmt_txn` is false for AS OF queries.)
630            Some(
631                determination @ TimestampDetermination {
632                    timestamp_context: TimestampContext::TimelineTimestamp { .. },
633                    ..
634                },
635            ) if in_immediate_multi_stmt_txn => {
636                // This is a subsequent (non-AS OF, non-constant) query in a multi-statement
637                // transaction. We now:
638                // - Validate that the query only accesses collections within the transaction's
639                //   timedomain (which we know from the stored read holds).
640                // - Use the transaction's stored timestamp determination.
641                // - Use the (relevant subset of the) transaction's read holds.
642
643                let txn_read_holds_opt = self
644                    .call_coordinator(|tx| Command::GetTransactionReadHoldsBundle {
645                        conn_id: session.conn_id().clone(),
646                        tx,
647                    })
648                    .await;
649
650                if let Some(txn_read_holds) = txn_read_holds_opt {
651                    let allowed_id_bundle = txn_read_holds.id_bundle();
652                    let outside = input_id_bundle.difference(&allowed_id_bundle);
653
654                    // Queries without a timestamp and timeline can belong to any existing timedomain.
655                    if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
656                        let valid_names =
657                            allowed_id_bundle.resolve_names(&*catalog, session.conn_id());
658                        let invalid_names = outside.resolve_names(&*catalog, session.conn_id());
659                        return Err(AdapterError::RelationOutsideTimeDomain {
660                            relations: invalid_names,
661                            names: valid_names,
662                        });
663                    }
664
665                    // Extract the subset of read holds for the collections this query accesses.
666                    let read_holds = txn_read_holds.subset(&input_id_bundle);
667
668                    (determination, read_holds)
669                } else {
670                    // This should never happen: we're in a subsequent query of a multi-statement
671                    // transaction (we have a transaction timestamp), but the coordinator has no
672                    // transaction read holds stored. This indicates a bug in the transaction
673                    // handling.
674                    return Err(AdapterError::Internal(
675                        "Missing transaction read holds for multi-statement transaction"
676                            .to_string(),
677                    ));
678                }
679            }
680            _ => {
681                // There is no timestamp determination yet for this transaction. Either:
682                // - We are not in a multi-statement transaction.
683                // - This is the first (non-AS OF) query in a multi-statement transaction.
684                // - This is an AS OF query.
685                // - This is a constant query (`TimestampContext::NoTimestamp`).
686
687                let timedomain_bundle;
688                let determine_bundle = if in_immediate_multi_stmt_txn {
689                    // This is the first (non-AS OF) query in a multi-statement transaction.
690                    // Determine a timestamp that will be valid for anything in any schema
691                    // referenced by the first query.
692                    timedomain_bundle = timedomain_for(
693                        &*catalog,
694                        &dataflow_builder,
695                        &source_ids,
696                        &timeline_context,
697                        session.conn_id(),
698                        target_cluster_id,
699                    )?;
700                    &timedomain_bundle
701                } else {
702                    // Simply use the inputs of the current query.
703                    &input_id_bundle
704                };
705                let (determination, read_holds) = self
706                    .frontend_determine_timestamp(
707                        session,
708                        determine_bundle,
709                        when,
710                        target_cluster_id,
711                        &timeline_context,
712                        oracle_read_ts,
713                        real_time_recency_ts,
714                    )
715                    .await?;
716
717                // If this is the first (non-AS OF) query in a multi-statement transaction, store
718                // the read holds in the coordinator, so subsequent queries can validate against
719                // them.
720                if in_immediate_multi_stmt_txn {
721                    self.call_coordinator(|tx| Command::StoreTransactionReadHolds {
722                        conn_id: session.conn_id().clone(),
723                        read_holds: read_holds.clone(),
724                        tx,
725                    })
726                    .await;
727                }
728
729                (determination, read_holds)
730            }
731        };
732
733        {
734            // Assert that we have a read hold for all the collections in our `input_id_bundle`.
735            for id in input_id_bundle.iter() {
736                let s = read_holds.storage_holds.contains_key(&id);
737                let c = read_holds
738                    .compute_ids()
739                    .map(|(_instance, coll)| coll)
740                    .contains(&id);
741                soft_assert_or_log!(
742                    s || c,
743                    "missing read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
744                    id,
745                    in_immediate_multi_stmt_txn,
746                );
747            }
748
749            // Assert that each part of the `input_id_bundle` corresponds to the right part of
750            // `read_holds`.
751            for id in input_id_bundle.storage_ids.iter() {
752                soft_assert_or_log!(
753                    read_holds.storage_holds.contains_key(id),
754                    "missing storage read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
755                    id,
756                    in_immediate_multi_stmt_txn,
757                );
758            }
759            for id in input_id_bundle
760                .compute_ids
761                .iter()
762                .flat_map(|(_instance, colls)| colls)
763            {
764                soft_assert_or_log!(
765                    read_holds
766                        .compute_ids()
767                        .map(|(_instance, coll)| coll)
768                        .contains(id),
769                    "missing compute read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
770                    id,
771                    in_immediate_multi_stmt_txn,
772                );
773            }
774        }
775
776        // (TODO(peek-seq): The below TODO is copied from the old peek sequencing. We should resolve
777        // this when we decide what to with `AS OF` in transactions.)
778        // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
779        // arbitrary and we don't recall why we did it (possibly an error!). Change this to always
780        // set the transaction ops. Decide and document what our policy should be on AS OF queries.
781        // Maybe they shouldn't be allowed in transactions at all because it's hard to explain
782        // what's going on there. This should probably get a small design document.
783
784        // We only track the peeks in the session if the query doesn't use AS
785        // OF or we're inside an explicit transaction. The latter case is
786        // necessary to support PG's `BEGIN` semantics, whose behavior can
787        // depend on whether or not reads have occurred in the txn.
788        let requires_linearization = (&explain_ctx).into();
789        let mut transaction_determination = determination.clone();
790        match query_plan {
791            QueryPlan::Subscribe { .. } => {
792                if when.is_transactional() {
793                    session.add_transaction_ops(TransactionOps::Subscribe)?;
794                }
795            }
796            QueryPlan::Select(..) | QueryPlan::CopyTo(..) => {
797                if when.is_transactional() {
798                    session.add_transaction_ops(TransactionOps::Peeks {
799                        determination: transaction_determination,
800                        cluster_id: target_cluster_id,
801                        requires_linearization,
802                    })?;
803                } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
804                    // If the query uses AS OF, then ignore the timestamp.
805                    transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
806                    session.add_transaction_ops(TransactionOps::Peeks {
807                        determination: transaction_determination,
808                        cluster_id: target_cluster_id,
809                        requires_linearization,
810                    })?;
811                }
812            }
813        }
814
815        // # From peek_optimize
816
817        let stats = statistics_oracle(
818            session,
819            &source_ids,
820            &determination.timestamp_context.antichain(),
821            true,
822            catalog.system_config(),
823            &*self.storage_collections,
824        )
825        .await
826        .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
827
828        // Generate data structures that can be moved to another task where we will perform possibly
829        // expensive optimizations.
830        let timestamp_context = determination.timestamp_context.clone();
831        let session_meta = session.meta();
832        let now = catalog.config().now.clone();
833        let target_cluster_name = target_cluster_name.clone();
834        let needs_plan_insights = explain_ctx.needs_plan_insights();
835        let determination_for_pushdown = if matches!(explain_ctx, ExplainContext::Pushdown) {
836            // This is a hairy data structure, so avoid this clone if we are not in
837            // EXPLAIN FILTER PUSHDOWN.
838            Some(determination.clone())
839        } else {
840            None
841        };
842
843        let span = Span::current();
844
845        // Prepare data for plan insights if needed
846        let catalog_for_insights = if needs_plan_insights {
847            Some(Arc::clone(&catalog))
848        } else {
849            None
850        };
851        let mut compute_instances = BTreeMap::new();
852        if needs_plan_insights {
853            for user_cluster in catalog.user_clusters() {
854                let snapshot = ComputeInstanceSnapshot::new_without_collections(user_cluster.id);
855                compute_instances.insert(user_cluster.name.clone(), snapshot);
856            }
857        }
858
859        let source_ids_for_closure = source_ids.clone();
860
861        let optimization_future: JoinHandle<Result<_, AdapterError>> = match query_plan {
862            QueryPlan::CopyTo(select_plan, mut copy_to_ctx) => {
863                let raw_expr = select_plan.source.clone();
864
865                // COPY TO path: calculate output_batch_count and create copy_to optimizer
866                let worker_counts = cluster.replicas().map(|r| {
867                    let loc = &r.config.location;
868                    loc.workers().unwrap_or_else(|| loc.num_processes())
869                });
870                let max_worker_count = match worker_counts.max() {
871                    Some(count) => u64::cast_from(count),
872                    None => {
873                        return Err(AdapterError::NoClusterReplicasAvailable {
874                            name: cluster.name.clone(),
875                            is_managed: cluster.is_managed(),
876                        });
877                    }
878                };
879                copy_to_ctx.output_batch_count = Some(max_worker_count);
880
881                let mut optimizer = optimize::copy_to::Optimizer::new(
882                    Arc::clone(&catalog),
883                    compute_instance_snapshot,
884                    view_id,
885                    copy_to_ctx,
886                    optimizer_config,
887                    self.optimizer_metrics.clone(),
888                );
889
890                mz_ore::task::spawn_blocking(
891                    || "optimize copy-to",
892                    move || {
893                        span.in_scope(|| {
894                            let _dispatch_guard = explain_ctx.dispatch_guard();
895
896                            // COPY TO path
897                            // HIR ⇒ MIR lowering and MIR optimization (local)
898                            let local_mir_plan =
899                                optimizer.catch_unwind_optimize(raw_expr.clone())?;
900                            // Attach resolved context required to continue the pipeline.
901                            let local_mir_plan = local_mir_plan.resolve(
902                                timestamp_context.clone(),
903                                &session_meta,
904                                stats,
905                            );
906                            // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
907                            let global_lir_plan =
908                                optimizer.catch_unwind_optimize(local_mir_plan)?;
909                            Ok(Execution::CopyToS3 {
910                                global_lir_plan,
911                                source_ids: source_ids_for_closure,
912                            })
913                        })
914                    },
915                )
916            }
917            QueryPlan::Select(select_plan) => {
918                let select_plan = select_plan.clone();
919                let raw_expr = select_plan.source.clone();
920
921                // SELECT/EXPLAIN path: create peek optimizer
922                let mut optimizer = optimize::peek::Optimizer::new(
923                    Arc::clone(&catalog),
924                    compute_instance_snapshot,
925                    select_plan.finishing.clone(),
926                    view_id,
927                    index_id,
928                    optimizer_config,
929                    self.optimizer_metrics.clone(),
930                );
931
932                mz_ore::task::spawn_blocking(
933                    || "optimize peek",
934                    move || {
935                        span.in_scope(|| {
936                            let _dispatch_guard = explain_ctx.dispatch_guard();
937
938                            // SELECT/EXPLAIN path
939                            // HIR ⇒ MIR lowering and MIR optimization (local)
940
941                            // The purpose of wrapping the following in a closure is to control where the
942                            // `?`s return from, so that even when a `catch_unwind_optimize` call fails,
943                            // we can still handle `EXPLAIN BROKEN`.
944                            let pipeline = || {
945                                let local_mir_plan =
946                                    optimizer.catch_unwind_optimize(raw_expr.clone())?;
947                                // Attach resolved context required to continue the pipeline.
948                                let local_mir_plan = local_mir_plan.resolve(
949                                    timestamp_context.clone(),
950                                    &session_meta,
951                                    stats,
952                                );
953                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
954                                let global_lir_plan =
955                                    optimizer.catch_unwind_optimize(local_mir_plan)?;
956                                Ok::<_, AdapterError>(global_lir_plan)
957                            };
958
959                            let global_lir_plan_result = pipeline();
960                            let optimization_finished_at = now();
961
962                            let create_insights_ctx =
963                                |optimizer: &optimize::peek::Optimizer,
964                                 is_notice: bool|
965                                 -> Option<Box<PlanInsightsContext>> {
966                                    if !needs_plan_insights {
967                                        return None;
968                                    }
969
970                                    let catalog = catalog_for_insights.as_ref()?;
971
972                                    let enable_re_optimize = if needs_plan_insights {
973                                        // Disable any plan insights that use the optimizer if we only want the
974                                        // notice and plan optimization took longer than the threshold. This is
975                                        // to prevent a situation where optimizing takes a while and there are
976                                        // lots of clusters, which would delay peek execution by the product of
977                                        // those.
978                                        //
979                                        // (This heuristic doesn't work well, see #9492.)
980                                        let dyncfgs = catalog.system_config().dyncfgs();
981                                        let opt_limit = mz_adapter_types::dyncfgs
982                                        ::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
983                                            .get(dyncfgs);
984                                        !(is_notice && optimizer.duration() > opt_limit)
985                                    } else {
986                                        false
987                                    };
988
989                                    Some(Box::new(PlanInsightsContext {
990                                        stmt: select_plan
991                                            .select
992                                            .as_deref()
993                                            .map(Clone::clone)
994                                            .map(Statement::Select),
995                                        raw_expr: raw_expr.clone(),
996                                        catalog: Arc::clone(catalog),
997                                        compute_instances,
998                                        target_instance: target_cluster_name,
999                                        metrics: optimizer.metrics().clone(),
1000                                        finishing: optimizer.finishing().clone(),
1001                                        optimizer_config: optimizer.config().clone(),
1002                                        session: session_meta,
1003                                        timestamp_context,
1004                                        view_id: optimizer.select_id(),
1005                                        index_id: optimizer.index_id(),
1006                                        enable_re_optimize,
1007                                    }))
1008                                };
1009
1010                            let global_lir_plan = match global_lir_plan_result {
1011                                Ok(plan) => plan,
1012                                Err(err) => {
1013                                    let result = if let ExplainContext::Plan(explain_ctx) =
1014                                        explain_ctx
1015                                        && explain_ctx.broken
1016                                    {
1017                                        // EXPLAIN BROKEN: log error and continue with defaults
1018                                        tracing::error!(
1019                                            "error while handling EXPLAIN statement: {}",
1020                                            err
1021                                        );
1022                                        Ok(Execution::ExplainPlan {
1023                                            df_meta: Default::default(),
1024                                            explain_ctx,
1025                                            optimizer,
1026                                            insights_ctx: None,
1027                                        })
1028                                    } else {
1029                                        Err(err)
1030                                    };
1031                                    return result;
1032                                }
1033                            };
1034
1035                            match explain_ctx {
1036                                ExplainContext::Plan(explain_ctx) => {
1037                                    let (_, df_meta, _) = global_lir_plan.unapply();
1038                                    let insights_ctx = create_insights_ctx(&optimizer, false);
1039                                    Ok(Execution::ExplainPlan {
1040                                        df_meta,
1041                                        explain_ctx,
1042                                        optimizer,
1043                                        insights_ctx,
1044                                    })
1045                                }
1046                                ExplainContext::None => Ok(Execution::Peek {
1047                                    global_lir_plan,
1048                                    optimization_finished_at,
1049                                    plan_insights_optimizer_trace: None,
1050                                    finishing: select_plan.finishing,
1051                                    copy_to: select_plan.copy_to,
1052                                    insights_ctx: None,
1053                                }),
1054                                ExplainContext::PlanInsightsNotice(optimizer_trace) => {
1055                                    let insights_ctx = create_insights_ctx(&optimizer, true);
1056                                    Ok(Execution::Peek {
1057                                        global_lir_plan,
1058                                        optimization_finished_at,
1059                                        plan_insights_optimizer_trace: Some(optimizer_trace),
1060                                        finishing: select_plan.finishing,
1061                                        copy_to: select_plan.copy_to,
1062                                        insights_ctx,
1063                                    })
1064                                }
1065                                ExplainContext::Pushdown => {
1066                                    let (plan, _, _) = global_lir_plan.unapply();
1067                                    let imports = match plan {
1068                                        PeekPlan::SlowPath(plan) => plan
1069                                            .desc
1070                                            .source_imports
1071                                            .into_iter()
1072                                            .filter_map(|(id, import)| {
1073                                                import.desc.arguments.operators.map(|mfp| (id, mfp))
1074                                            })
1075                                            .collect(),
1076                                        PeekPlan::FastPath(_) => {
1077                                            std::collections::BTreeMap::default()
1078                                        }
1079                                    };
1080                                    Ok(Execution::ExplainPushdown {
1081                                        imports,
1082                                        determination: determination_for_pushdown
1083                                            .expect("it's present for the ExplainPushdown case"),
1084                                    })
1085                                }
1086                            }
1087                        })
1088                    },
1089                )
1090            }
1091            QueryPlan::Subscribe(plan) => {
1092                let plan = plan.clone();
1093                let catalog: Arc<Catalog> = Arc::clone(&catalog);
1094                let debug_name = format!("subscribe-{}", index_id);
1095                let mut optimizer = optimize::subscribe::Optimizer::new(
1096                    catalog,
1097                    compute_instance_snapshot.clone(),
1098                    view_id,
1099                    index_id,
1100                    plan.with_snapshot,
1101                    plan.up_to,
1102                    debug_name,
1103                    optimizer_config,
1104                    self.optimizer_metrics.clone(),
1105                );
1106                mz_ore::task::spawn_blocking(
1107                    || "optimize subscribe",
1108                    move || {
1109                        span.in_scope(|| {
1110                            let _dispatch_guard = explain_ctx.dispatch_guard();
1111
1112                            let global_mir_plan = optimizer.catch_unwind_optimize(plan.clone())?;
1113                            let as_of = timestamp_context.timestamp_or_default();
1114
1115                            if let Some(up_to) = optimizer.up_to() {
1116                                if as_of > up_to {
1117                                    return Err(AdapterError::AbsurdSubscribeBounds {
1118                                        as_of,
1119                                        up_to,
1120                                    });
1121                                }
1122                            }
1123                            let local_mir_plan =
1124                                global_mir_plan.resolve(Antichain::from_elem(as_of));
1125
1126                            let global_lir_plan =
1127                                optimizer.catch_unwind_optimize(local_mir_plan)?;
1128                            let optimization_finished_at = now();
1129
1130                            let (df_desc, df_meta) = global_lir_plan.unapply();
1131                            Ok(Execution::Subscribe {
1132                                subscribe_plan: plan,
1133                                df_desc,
1134                                df_meta,
1135                                optimization_finished_at,
1136                            })
1137                        })
1138                    },
1139                )
1140            }
1141        };
1142
1143        let optimization_timeout = *session.vars().statement_timeout();
1144        let optimization_result =
1145            // Note: spawn_blocking tasks cannot be cancelled, so on timeout we stop waiting but the
1146            // optimization task continues running in the background until completion. See
1147            // https://github.com/MaterializeInc/database-issues/issues/8644 for properly cancelling
1148            // optimizer runs.
1149            match tokio::time::timeout(optimization_timeout, optimization_future).await {
1150                Ok(Ok(result)) => result,
1151                Ok(Err(AdapterError::Optimizer(err))) => {
1152                    return Err(AdapterError::Internal(format!(
1153                        "internal error in optimizer: {}",
1154                        err
1155                    )));
1156                }
1157                Ok(Err(err)) => {
1158                    return Err(err);
1159                }
1160                Err(_elapsed) => {
1161                    warn!("optimize peek timed out after {:?}", optimization_timeout);
1162                    return Err(AdapterError::StatementTimeout);
1163                }
1164            };
1165
1166        // Log optimization finished
1167        if let Some(logging_id) = &statement_logging_id {
1168            self.log_lifecycle_event(*logging_id, StatementLifecycleEvent::OptimizationFinished);
1169        }
1170
1171        // Assert that read holds are correct for the execution plan
1172        Self::assert_read_holds_correct(
1173            &read_holds,
1174            &optimization_result,
1175            &determination,
1176            target_cluster_id,
1177            in_immediate_multi_stmt_txn,
1178        );
1179
1180        // Handle the optimization result: either generate EXPLAIN output or continue with execution
1181        match optimization_result {
1182            Execution::ExplainPlan {
1183                df_meta,
1184                explain_ctx,
1185                optimizer,
1186                insights_ctx,
1187            } => {
1188                let rows = coord::sequencer::explain_plan_inner(
1189                    session,
1190                    &catalog,
1191                    df_meta,
1192                    explain_ctx,
1193                    optimizer,
1194                    insights_ctx,
1195                )
1196                .await?;
1197
1198                Ok(Some(ExecuteResponse::SendingRowsImmediate {
1199                    rows: Box::new(rows.into_row_iter()),
1200                }))
1201            }
1202            Execution::ExplainPushdown {
1203                imports,
1204                determination,
1205            } => {
1206                // # From peek_explain_pushdown
1207
1208                let as_of = determination.timestamp_context.antichain();
1209                let mz_now = determination
1210                    .timestamp_context
1211                    .timestamp()
1212                    .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1213                    .unwrap_or_else(ResultSpec::value_all);
1214
1215                Ok(Some(
1216                    coord::sequencer::explain_pushdown_future_inner(
1217                        session,
1218                        &*catalog,
1219                        &self.storage_collections,
1220                        as_of,
1221                        mz_now,
1222                        imports,
1223                    )
1224                    .await
1225                    .await?,
1226                ))
1227            }
1228            Execution::Peek {
1229                global_lir_plan,
1230                optimization_finished_at: _optimization_finished_at,
1231                plan_insights_optimizer_trace,
1232                finishing,
1233                copy_to,
1234                insights_ctx,
1235            } => {
1236                // Continue with normal execution
1237                // # From peek_finish
1238
1239                // The typ here was generated from the HIR SQL type and simply stored in LIR.
1240                let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
1241
1242                coord::sequencer::emit_optimizer_notices(
1243                    &*catalog,
1244                    session,
1245                    &df_meta.optimizer_notices,
1246                );
1247
1248                // Generate plan insights notice if needed
1249                if let Some(trace) = plan_insights_optimizer_trace {
1250                    let target_cluster = catalog.get_cluster(target_cluster_id);
1251                    let features = OptimizerFeatures::from(catalog.system_config())
1252                        .override_from(&target_cluster.config.features());
1253                    let insights = trace
1254                        .into_plan_insights(
1255                            &features,
1256                            &catalog.for_session(session),
1257                            Some(finishing.clone()),
1258                            Some(target_cluster),
1259                            df_meta.clone(),
1260                            insights_ctx,
1261                        )
1262                        .await?;
1263                    session.add_notice(AdapterNotice::PlanInsights(insights));
1264                }
1265
1266                // # Now back to peek_finish
1267
1268                let watch_set = statement_logging_id.map(|logging_id| {
1269                    WatchSetCreation::new(
1270                        logging_id,
1271                        catalog.state(),
1272                        &input_id_bundle,
1273                        determination.timestamp_context.timestamp_or_default(),
1274                    )
1275                });
1276
1277                let max_result_size = catalog.system_config().max_result_size();
1278
1279                // Clone determination if we need it for emit_timestamp_notice, since it may be
1280                // moved into Command::ExecuteSlowPathPeek.
1281                let determination_for_notice = if session.vars().emit_timestamp_notice() {
1282                    Some(determination.clone())
1283                } else {
1284                    None
1285                };
1286
1287                let response = match peek_plan {
1288                    PeekPlan::FastPath(fast_path_plan) => {
1289                        if let Some(logging_id) = &statement_logging_id {
1290                            // TODO(peek-seq): Actually, we should log it also for
1291                            // FastPathPlan::Constant. The only reason we are not doing so at the
1292                            // moment is to match the old peek sequencing, so that statement logging
1293                            // tests pass with the frontend peek sequencing turned both on and off.
1294                            //
1295                            // When the old sequencing is removed, we should make a couple of
1296                            // changes in how we log timestamps:
1297                            // - Move this up to just after timestamp determination, so that it
1298                            //   appears in the log as soon as possible.
1299                            // - Do it also for Constant peeks.
1300                            // - Currently, slow-path peeks' timestamp logging is done by
1301                            //   `implement_peek_plan`. We could remove it from there, and just do
1302                            //   it here.
1303                            if !matches!(fast_path_plan, FastPathPlan::Constant(..)) {
1304                                self.log_set_timestamp(
1305                                    *logging_id,
1306                                    determination.timestamp_context.timestamp_or_default(),
1307                                );
1308                            }
1309                        }
1310
1311                        let row_set_finishing_seconds =
1312                            session.metrics().row_set_finishing_seconds().clone();
1313
1314                        let peek_stash_read_batch_size_bytes =
1315                            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
1316                                .get(catalog.system_config().dyncfgs());
1317                        let peek_stash_read_memory_budget_bytes =
1318                            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
1319                                .get(catalog.system_config().dyncfgs());
1320
1321                        self.implement_fast_path_peek_plan(
1322                            fast_path_plan,
1323                            determination.timestamp_context.timestamp_or_default(),
1324                            finishing,
1325                            target_cluster_id,
1326                            target_replica,
1327                            typ,
1328                            max_result_size,
1329                            max_query_result_size,
1330                            row_set_finishing_seconds,
1331                            read_holds,
1332                            peek_stash_read_batch_size_bytes,
1333                            peek_stash_read_memory_budget_bytes,
1334                            session.conn_id().clone(),
1335                            source_ids,
1336                            watch_set,
1337                        )
1338                        .await?
1339                    }
1340                    PeekPlan::SlowPath(dataflow_plan) => {
1341                        if let Some(logging_id) = &statement_logging_id {
1342                            self.log_set_transient_index_id(*logging_id, dataflow_plan.id);
1343                        }
1344
1345                        self.call_coordinator(|tx| Command::ExecuteSlowPathPeek {
1346                            dataflow_plan: Box::new(dataflow_plan),
1347                            determination,
1348                            finishing,
1349                            compute_instance: target_cluster_id,
1350                            target_replica,
1351                            intermediate_result_type: typ,
1352                            source_ids,
1353                            conn_id: session.conn_id().clone(),
1354                            max_result_size,
1355                            max_query_result_size,
1356                            watch_set,
1357                            tx,
1358                        })
1359                        .await?
1360                    }
1361                };
1362
1363                // Add timestamp notice if emit_timestamp_notice is enabled
1364                if let Some(determination) = determination_for_notice {
1365                    let explanation = self
1366                        .call_coordinator(|tx| Command::ExplainTimestamp {
1367                            conn_id: session.conn_id().clone(),
1368                            session_wall_time: session.pcx().wall_time,
1369                            cluster_id: target_cluster_id,
1370                            id_bundle: input_id_bundle.clone(),
1371                            determination,
1372                            tx,
1373                        })
1374                        .await;
1375                    session.add_notice(AdapterNotice::QueryTimestamp { explanation });
1376                }
1377
1378                Ok(Some(match copy_to {
1379                    None => response,
1380                    // COPY TO STDOUT
1381                    Some(format) => ExecuteResponse::CopyTo {
1382                        format,
1383                        resp: Box::new(response),
1384                    },
1385                }))
1386            }
1387            Execution::Subscribe {
1388                subscribe_plan,
1389                df_desc,
1390                df_meta,
1391                optimization_finished_at: _optimization_finished_at,
1392            } => {
1393                if df_desc.as_of.as_ref().expect("as of set") == &df_desc.until {
1394                    session.add_notice(AdapterNotice::EqualSubscribeBounds {
1395                        bound: *df_desc.until.as_option().expect("as of set"),
1396                    });
1397                }
1398                coord::sequencer::emit_optimizer_notices(
1399                    &*catalog,
1400                    session,
1401                    &df_meta.optimizer_notices,
1402                );
1403
1404                let response = self
1405                    .call_coordinator(|tx| Command::ExecuteSubscribe {
1406                        df_desc,
1407                        dependency_ids: subscribe_plan.from.depends_on(),
1408                        cluster_id: target_cluster_id,
1409                        replica_id: target_replica,
1410                        conn_id: session.conn_id().clone(),
1411                        session_uuid: session.uuid(),
1412                        read_holds,
1413                        plan: subscribe_plan,
1414                        statement_logging_id,
1415                        tx,
1416                    })
1417                    .await?;
1418                Ok(Some(response))
1419            }
1420            Execution::CopyToS3 {
1421                global_lir_plan,
1422                source_ids,
1423            } => {
1424                let (df_desc, df_meta) = global_lir_plan.unapply();
1425
1426                coord::sequencer::emit_optimizer_notices(
1427                    &*catalog,
1428                    session,
1429                    &df_meta.optimizer_notices,
1430                );
1431
1432                // Extract S3 sink connection info for preflight check
1433                let sink_id = df_desc.sink_id();
1434                let sinks = &df_desc.sink_exports;
1435                if sinks.len() != 1 {
1436                    return Err(AdapterError::Internal(
1437                        "expected exactly one copy to s3 sink".into(),
1438                    ));
1439                }
1440                let (_, sink_desc) = sinks
1441                    .first_key_value()
1442                    .expect("known to be exactly one copy to s3 sink");
1443                let s3_sink_connection = match &sink_desc.connection {
1444                    mz_compute_types::sinks::ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1445                        conn.clone()
1446                    }
1447                    _ => {
1448                        return Err(AdapterError::Internal(
1449                            "expected copy to s3 oneshot sink".into(),
1450                        ));
1451                    }
1452                };
1453
1454                // Perform S3 preflight check in background task (via coordinator).
1455                // This runs slow S3 operations without blocking the coordinator's main task.
1456                self.call_coordinator(|tx| Command::CopyToPreflight {
1457                    s3_sink_connection,
1458                    sink_id,
1459                    tx,
1460                })
1461                .await?;
1462
1463                // Preflight succeeded, now execute the actual COPY TO dataflow
1464                let watch_set = statement_logging_id.map(|logging_id| {
1465                    WatchSetCreation::new(
1466                        logging_id,
1467                        catalog.state(),
1468                        &input_id_bundle,
1469                        determination.timestamp_context.timestamp_or_default(),
1470                    )
1471                });
1472
1473                let response = self
1474                    .call_coordinator(|tx| Command::ExecuteCopyTo {
1475                        df_desc: Box::new(df_desc),
1476                        compute_instance: target_cluster_id,
1477                        target_replica,
1478                        source_ids,
1479                        conn_id: session.conn_id().clone(),
1480                        watch_set,
1481                        tx,
1482                    })
1483                    .await?;
1484
1485                Ok(Some(response))
1486            }
1487        }
1488    }
1489
1490    /// (Similar to Coordinator::determine_timestamp)
1491    /// Determines the timestamp for a query, acquires read holds that ensure the
1492    /// query remains executable at that time, and returns those.
1493    /// The caller is responsible for eventually dropping those read holds.
1494    ///
1495    /// Note: self is taken &mut because of the lazy fetching in `get_compute_instance_client`.
1496    pub(crate) async fn frontend_determine_timestamp(
1497        &mut self,
1498        session: &Session,
1499        id_bundle: &CollectionIdBundle,
1500        when: &QueryWhen,
1501        compute_instance: ComputeInstanceId,
1502        timeline_context: &TimelineContext,
1503        oracle_read_ts: Option<Timestamp>,
1504        real_time_recency_ts: Option<Timestamp>,
1505    ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
1506        // this is copy-pasted from Coordinator
1507
1508        let isolation_level = session.vars().transaction_isolation();
1509
1510        let (read_holds, upper) = self
1511            .acquire_read_holds_and_least_valid_write(id_bundle)
1512            .await
1513            .map_err(|err| {
1514                AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1515                    err,
1516                    compute_instance,
1517                )
1518            })?;
1519        let (det, read_holds) = <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1520            session,
1521            id_bundle,
1522            when,
1523            timeline_context,
1524            oracle_read_ts,
1525            real_time_recency_ts,
1526            isolation_level,
1527            read_holds,
1528            upper.clone(),
1529        )?;
1530
1531        session
1532            .metrics()
1533            .determine_timestamp(&[
1534                match det.respond_immediately() {
1535                    true => "true",
1536                    false => "false",
1537                },
1538                isolation_level.as_str(),
1539                &compute_instance.to_string(),
1540            ])
1541            .inc();
1542        if !det.respond_immediately()
1543            && isolation_level == &IsolationLevel::StrictSerializable
1544            && real_time_recency_ts.is_none()
1545        {
1546            // Note down the difference between StrictSerializable and Serializable into a metric.
1547            if let Some(strict) = det.timestamp_context.timestamp() {
1548                let (serializable_det, _tmp_read_holds) =
1549                    <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1550                        session,
1551                        id_bundle,
1552                        when,
1553                        timeline_context,
1554                        oracle_read_ts,
1555                        real_time_recency_ts,
1556                        &IsolationLevel::Serializable,
1557                        read_holds.clone(),
1558                        upper,
1559                    )?;
1560                if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
1561                    session
1562                        .metrics()
1563                        .timestamp_difference_for_strict_serializable_ms(&[compute_instance
1564                            .to_string()
1565                            .as_ref()])
1566                        .observe(f64::cast_lossy(u64::from(
1567                            strict.saturating_sub(*serializable),
1568                        )));
1569                }
1570            }
1571        }
1572
1573        Ok((det, read_holds))
1574    }
1575
1576    fn assert_read_holds_correct(
1577        read_holds: &ReadHolds<Timestamp>,
1578        execution: &Execution,
1579        determination: &TimestampDetermination<Timestamp>,
1580        target_cluster_id: ClusterId,
1581        in_immediate_multi_stmt_txn: bool,
1582    ) {
1583        // Extract source_imports, index_imports, as_of, and execution_name based on Execution variant
1584        let (source_imports, index_imports, as_of, execution_name): (
1585            Vec<GlobalId>,
1586            Vec<GlobalId>,
1587            Timestamp,
1588            &str,
1589        ) = match execution {
1590            Execution::Peek {
1591                global_lir_plan, ..
1592            } => match global_lir_plan.peek_plan() {
1593                PeekPlan::FastPath(fast_path_plan) => {
1594                    let (sources, indexes) = match fast_path_plan {
1595                        FastPathPlan::Constant(..) => (vec![], vec![]),
1596                        FastPathPlan::PeekExisting(_coll_id, idx_id, ..) => (vec![], vec![*idx_id]),
1597                        FastPathPlan::PeekPersist(global_id, ..) => (vec![*global_id], vec![]),
1598                    };
1599                    (
1600                        sources,
1601                        indexes,
1602                        determination.timestamp_context.timestamp_or_default(),
1603                        "FastPath",
1604                    )
1605                }
1606                PeekPlan::SlowPath(dataflow_plan) => {
1607                    let as_of = dataflow_plan
1608                        .desc
1609                        .as_of
1610                        .clone()
1611                        .expect("dataflow has an as_of")
1612                        .into_element();
1613                    (
1614                        dataflow_plan.desc.source_imports.keys().cloned().collect(),
1615                        dataflow_plan.desc.index_imports.keys().cloned().collect(),
1616                        as_of,
1617                        "SlowPath",
1618                    )
1619                }
1620            },
1621            Execution::CopyToS3 {
1622                global_lir_plan, ..
1623            } => {
1624                let df_desc = global_lir_plan.df_desc();
1625                let as_of = df_desc
1626                    .as_of
1627                    .clone()
1628                    .expect("dataflow has an as_of")
1629                    .into_element();
1630                (
1631                    df_desc.source_imports.keys().cloned().collect(),
1632                    df_desc.index_imports.keys().cloned().collect(),
1633                    as_of,
1634                    "CopyToS3",
1635                )
1636            }
1637            Execution::ExplainPlan { .. } | Execution::ExplainPushdown { .. } => {
1638                // No read holds assertions needed for EXPLAIN variants
1639                return;
1640            }
1641            Execution::Subscribe { df_desc, .. } => {
1642                let as_of = df_desc
1643                    .as_of
1644                    .clone()
1645                    .expect("dataflow has an as_of")
1646                    .into_element();
1647                (
1648                    df_desc.source_imports.keys().cloned().collect(),
1649                    df_desc.index_imports.keys().cloned().collect(),
1650                    as_of,
1651                    "Subscribe",
1652                )
1653            }
1654        };
1655
1656        // Assert that we have some read holds for all the imports of the dataflow.
1657        for id in source_imports.iter() {
1658            soft_assert_or_log!(
1659                read_holds.storage_holds.contains_key(id),
1660                "[{}] missing read hold for the source import {}; (in_immediate_multi_stmt_txn: {})",
1661                execution_name,
1662                id,
1663                in_immediate_multi_stmt_txn,
1664            );
1665        }
1666        for id in index_imports.iter() {
1667            soft_assert_or_log!(
1668                read_holds
1669                    .compute_ids()
1670                    .map(|(_instance, coll)| coll)
1671                    .contains(id),
1672                "[{}] missing read hold for the index import {}; (in_immediate_multi_stmt_txn: {})",
1673                execution_name,
1674                id,
1675                in_immediate_multi_stmt_txn,
1676            );
1677        }
1678
1679        // Also check the holds against the as_of.
1680        for (id, h) in read_holds.storage_holds.iter() {
1681            soft_assert_or_log!(
1682                h.since().less_equal(&as_of),
1683                "[{}] storage read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1684                execution_name,
1685                h.since(),
1686                id,
1687                as_of,
1688                determination,
1689                in_immediate_multi_stmt_txn,
1690            );
1691        }
1692        for ((instance, id), h) in read_holds.compute_holds.iter() {
1693            soft_assert_eq_or_log!(
1694                *instance,
1695                target_cluster_id,
1696                "[{}] the read hold on {} is on the wrong cluster; (in_immediate_multi_stmt_txn: {})",
1697                execution_name,
1698                id,
1699                in_immediate_multi_stmt_txn,
1700            );
1701            soft_assert_or_log!(
1702                h.since().less_equal(&as_of),
1703                "[{}] compute read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1704                execution_name,
1705                h.since(),
1706                id,
1707                as_of,
1708                determination,
1709                in_immediate_multi_stmt_txn,
1710            );
1711        }
1712    }
1713}
1714
1715/// Enum for branching among various execution steps after optimization
1716enum Execution {
1717    Peek {
1718        global_lir_plan: optimize::peek::GlobalLirPlan,
1719        optimization_finished_at: EpochMillis,
1720        plan_insights_optimizer_trace: Option<OptimizerTrace>,
1721        finishing: RowSetFinishing,
1722        copy_to: Option<plan::CopyFormat>,
1723        insights_ctx: Option<Box<PlanInsightsContext>>,
1724    },
1725    Subscribe {
1726        subscribe_plan: SubscribePlan,
1727        df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1728        df_meta: DataflowMetainfo,
1729        optimization_finished_at: EpochMillis,
1730    },
1731    CopyToS3 {
1732        global_lir_plan: optimize::copy_to::GlobalLirPlan,
1733        source_ids: BTreeSet<GlobalId>,
1734    },
1735    ExplainPlan {
1736        df_meta: DataflowMetainfo,
1737        explain_ctx: ExplainPlanContext,
1738        optimizer: optimize::peek::Optimizer,
1739        insights_ctx: Option<Box<PlanInsightsContext>>,
1740    },
1741    ExplainPushdown {
1742        imports: BTreeMap<GlobalId, mz_expr::MapFilterProject>,
1743        determination: TimestampDetermination<Timestamp>,
1744    },
1745}