Skip to main content

mz_adapter/
frontend_peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::sync::Arc;
13use std::time::Duration;
14
15use itertools::Itertools;
16use mz_adapter_types::dyncfgs::ENABLE_FRONTEND_SUBSCRIBES;
17use mz_compute_types::ComputeInstanceId;
18use mz_compute_types::dataflows::DataflowDescription;
19use mz_controller_types::ClusterId;
20use mz_expr::{CollectionPlan, ResultSpec, RowSetFinishing};
21use mz_ore::cast::{CastFrom, CastLossy};
22use mz_ore::collections::CollectionExt;
23use mz_ore::now::EpochMillis;
24use mz_ore::task::JoinHandle;
25use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
26use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
27use mz_repr::role_id::RoleId;
28use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
29use mz_sql::ast::Raw;
30use mz_sql::catalog::CatalogCluster;
31use mz_sql::plan::Params;
32use mz_sql::plan::{
33    self, Explainee, ExplaineeStatement, Plan, QueryWhen, SelectPlan, SubscribePlan,
34};
35use mz_sql::rbac;
36use mz_sql::session::metadata::SessionMetadata;
37use mz_sql::session::vars::IsolationLevel;
38use mz_sql_parser::ast::{CopyDirection, ExplainStage, ShowStatement, Statement};
39use mz_transform::EmptyStatisticsOracle;
40use mz_transform::dataflow::DataflowMetainfo;
41use opentelemetry::trace::TraceContextExt;
42use timely::progress::Antichain;
43use tracing::{Span, debug, warn};
44use tracing_opentelemetry::OpenTelemetrySpanExt;
45
46use crate::catalog::Catalog;
47use crate::command::Command;
48use crate::coord::peek::{FastPathPlan, PeekPlan};
49use crate::coord::sequencer::{eval_copy_to_uri, statistics_oracle};
50use crate::coord::timeline::timedomain_for;
51use crate::coord::timestamp_selection::TimestampDetermination;
52use crate::coord::{
53    Coordinator, CopyToContext, ExecuteContextGuard, ExplainContext, ExplainPlanContext,
54    TargetCluster,
55};
56use crate::explain::insights::PlanInsightsContext;
57use crate::explain::optimizer_trace::OptimizerTrace;
58use crate::optimize::Optimize;
59use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
60use crate::session::{Session, TransactionOps, TransactionStatus};
61use crate::statement_logging::WatchSetCreation;
62use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
63use crate::{
64    AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, PeekClient, ReadHolds,
65    TimelineContext, TimestampContext, TimestampProvider, optimize,
66};
67use crate::{coord, metrics};
68
69impl PeekClient {
70    /// Attempt to sequence a peek from the session task.
71    ///
72    /// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the
73    /// Coordinator's sequencing. If it returns an error, it should be returned to the user.
74    ///
75    /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
76    /// triggering the execution of the underlying query.
77    pub(crate) async fn try_frontend_peek(
78        &mut self,
79        portal_name: &str,
80        session: &mut Session,
81        outer_ctx_extra: &mut Option<ExecuteContextGuard>,
82    ) -> Result<Option<ExecuteResponse>, AdapterError> {
83        // # From handle_execute
84
85        if session.vars().emit_trace_id_notice() {
86            let span_context = tracing::Span::current()
87                .context()
88                .span()
89                .span_context()
90                .clone();
91            if span_context.is_valid() {
92                session.add_notice(AdapterNotice::QueryTrace {
93                    trace_id: span_context.trace_id(),
94                });
95            }
96        }
97
98        // TODO(peek-seq): This snapshot is wasted when we end up bailing out from the frontend peek
99        // sequencing. We could solve this is with that optimization where we
100        // continuously keep a catalog snapshot in the session, and only get a new one when the
101        // catalog revision has changed, which we could see with an atomic read.
102        // But anyhow, this problem will just go away when we reach the point that we never fall
103        // back to the old sequencing.
104        let catalog = self.catalog_snapshot("try_frontend_peek").await;
105
106        // Extract things from the portal.
107        let (stmt, params, logging, lifecycle_timestamps) = {
108            if let Err(err) = Coordinator::verify_portal(&*catalog, session, portal_name) {
109                outer_ctx_extra
110                    .take()
111                    .and_then(|guard| guard.defuse().retire());
112                return Err(err);
113            }
114            let portal = session
115                .get_portal_unverified(portal_name)
116                // The portal is a session-level thing, so it couldn't have concurrently disappeared
117                // since the above verification.
118                .expect("called verify_portal above");
119            let params = portal.parameters.clone();
120            let stmt = portal.stmt.clone();
121            let logging = Arc::clone(&portal.logging);
122            let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
123            (stmt, params, logging, lifecycle_timestamps)
124        };
125
126        // Before planning, check if this is a statement type we can handle.
127        // This must happen BEFORE statement logging setup to avoid orphaned execution records.
128        if let Some(ref stmt) = stmt {
129            match &**stmt {
130                Statement::Select(_)
131                | Statement::ExplainAnalyzeObject(_)
132                | Statement::ExplainAnalyzeCluster(_)
133                | Statement::Show(ShowStatement::ShowObjects(_))
134                | Statement::Show(ShowStatement::ShowColumns(_)) => {
135                    // These are always fine, just continue.
136                    // Note: EXPLAIN ANALYZE will `plan` to `Plan::Select`.
137                    // Note: ShowObjects plans to `Plan::Select`, ShowColumns plans to `Plan::ShowColumns`.
138                    // We handle `Plan::ShowColumns` specially in `try_frontend_peek_inner`.
139                }
140                Statement::ExplainPlan(explain_stmt) => {
141                    // Only handle ExplainPlan for SELECT statements.
142                    // We don't want to handle e.g. EXPLAIN CREATE MATERIALIZED VIEW here, because that
143                    // requires purification before planning, which the frontend peek sequencing doesn't
144                    // do.
145                    match &explain_stmt.explainee {
146                        mz_sql_parser::ast::Explainee::Select(..) => {
147                            // This is a SELECT, continue
148                        }
149                        _ => {
150                            debug!(
151                                "Bailing out from try_frontend_peek, because EXPLAIN is not for a SELECT query"
152                            );
153                            return Ok(None);
154                        }
155                    }
156                }
157                Statement::ExplainPushdown(explain_stmt) => {
158                    // Only handle EXPLAIN FILTER PUSHDOWN for non-BROKEN SELECT statements
159                    match &explain_stmt.explainee {
160                        mz_sql_parser::ast::Explainee::Select(_, false) => {}
161                        _ => {
162                            debug!(
163                                "Bailing out from try_frontend_peek, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is for EXPLAIN BROKEN"
164                            );
165                            return Ok(None);
166                        }
167                    }
168                }
169                Statement::Copy(copy_stmt) => {
170                    match &copy_stmt.direction {
171                        CopyDirection::To => {
172                            // This is COPY TO (...), continue
173                        }
174                        CopyDirection::From => {
175                            debug!(
176                                "Bailing out from try_frontend_peek, because COPY FROM is not supported"
177                            );
178                            return Ok(None);
179                        }
180                    }
181                }
182
183                Statement::Subscribe(_)
184                    if ENABLE_FRONTEND_SUBSCRIBES.get(catalog.system_config().dyncfgs()) =>
185                {
186                    // We have a subscribe statement to process; continue.
187                }
188                _ => {
189                    debug!(
190                        "Bailing out from try_frontend_peek, because statement type is not supported"
191                    );
192                    return Ok(None);
193                }
194            }
195        }
196
197        // Set up statement logging, and log the beginning of execution.
198        // (But only if we're not executing in the context of another statement.)
199        let statement_logging_id = if outer_ctx_extra.is_none() {
200            // This is a new statement, so begin statement logging
201            let result = self.statement_logging_frontend.begin_statement_execution(
202                session,
203                &params,
204                &logging,
205                catalog.system_config(),
206                lifecycle_timestamps,
207            );
208
209            if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result {
210                self.log_began_execution(began_execution, mseh_update, prepared_statement);
211                Some(logging_id)
212            } else {
213                None
214            }
215        } else {
216            // We're executing in the context of another statement (e.g., FETCH),
217            // so extract the statement logging ID from the outer context if present.
218            // We take ownership and retire the outer context here. The end of execution will be
219            // logged in one of the following ways:
220            // - At the end of this function, if the execution is finished by then.
221            // - Later by the Coordinator, either due to RegisterFrontendPeek or ExecuteSlowPathPeek.
222            outer_ctx_extra
223                .take()
224                .and_then(|guard| guard.defuse().retire())
225        };
226
227        let result = self
228            .try_frontend_peek_inner(session, catalog, stmt, params, statement_logging_id)
229            .await;
230
231        // Log the end of execution if we are logging this statement and execution has already
232        // ended.
233        if let Some(logging_id) = statement_logging_id {
234            let reason = match &result {
235                // Streaming results are handled asynchronously by the coordinator
236                Ok(Some(
237                    ExecuteResponse::SendingRowsStreaming { .. }
238                    | ExecuteResponse::Subscribing { .. },
239                )) => {
240                    // Don't log here - the peek or subscribe is still executing.
241                    // It will be logged when handle_peek_notification is called.
242                    return result;
243                }
244                // COPY TO needs to check its inner response
245                Ok(Some(resp @ ExecuteResponse::CopyTo { resp: inner, .. })) => {
246                    match inner.as_ref() {
247                        ExecuteResponse::SendingRowsStreaming { .. }
248                        | ExecuteResponse::Subscribing { .. } => {
249                            // Don't log here - the peek or subscribe is still executing.
250                            // It will be logged when handle_peek_notification is called.
251                            return result;
252                        }
253                        // For non-streaming COPY TO responses, use the outer CopyTo for conversion
254                        _ => resp.into(),
255                    }
256                }
257                // Bailout case, which should not happen
258                Ok(None) => {
259                    soft_panic_or_log!(
260                        "Bailed out from `try_frontend_peek_inner` after we already logged the beginning of statement execution."
261                    );
262                    // This statement will be handled by the old peek sequencing, which will do its
263                    // own statement logging from the beginning. So, let's close out this one.
264                    self.log_ended_execution(
265                        logging_id,
266                        StatementEndedExecutionReason::Errored {
267                            error: "Internal error: bailed out from `try_frontend_peek_inner`"
268                                .to_string(),
269                        },
270                    );
271                    return result;
272                }
273                // All other success responses - use the From implementation
274                // TODO(peek-seq): After we delete the old peek sequencing, we'll be able to adjust
275                // the From implementation to do exactly what we need in the frontend peek
276                // sequencing, so that the above special cases won't be needed.
277                Ok(Some(resp)) => resp.into(),
278                Err(e) => StatementEndedExecutionReason::Errored {
279                    error: e.to_string(),
280                },
281            };
282
283            self.log_ended_execution(logging_id, reason);
284        }
285
286        result
287    }
288
289    /// This is encapsulated in an inner function so that the outer function can still do statement
290    /// logging after the `?` returns of the inner function.
291    async fn try_frontend_peek_inner(
292        &mut self,
293        session: &mut Session,
294        catalog: Arc<Catalog>,
295        stmt: Option<Arc<Statement<Raw>>>,
296        params: Params,
297        statement_logging_id: Option<crate::statement_logging::StatementLoggingId>,
298    ) -> Result<Option<ExecuteResponse>, AdapterError> {
299        let stmt = match stmt {
300            Some(stmt) => stmt,
301            None => {
302                debug!("try_frontend_peek_inner succeeded on an empty query");
303                return Ok(Some(ExecuteResponse::EmptyQuery));
304            }
305        };
306
307        session
308            .metrics()
309            .query_total(&[
310                metrics::session_type_label_value(session.user()),
311                metrics::statement_type_label_value(&stmt),
312            ])
313            .inc();
314
315        // # From handle_execute_inner
316
317        let conn_catalog = catalog.for_session(session);
318        // (`resolved_ids` should be derivable from `stmt`. If `stmt` is later transformed to
319        // remove/add IDs, then `resolved_ids` should be updated to also remove/add those IDs.)
320        let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?;
321
322        let pcx = session.pcx();
323        let plan = mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, &params, &resolved_ids)?;
324
325        /// What do we do with the result of the select?
326        enum QueryPlan<'a> {
327            Select(&'a SelectPlan),
328            CopyTo(&'a SelectPlan, CopyToContext),
329            Subscribe(&'a SubscribePlan),
330        }
331
332        let (query_plan, explain_ctx) = match &plan {
333            Plan::Select(select_plan) => {
334                let explain_ctx = if session.vars().emit_plan_insights_notice() {
335                    let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
336                    ExplainContext::PlanInsightsNotice(optimizer_trace)
337                } else {
338                    ExplainContext::None
339                };
340                (QueryPlan::Select(select_plan), explain_ctx)
341            }
342            Plan::ShowColumns(show_columns_plan) => {
343                // ShowColumns wraps a SelectPlan, extract it and proceed as normal.
344                (
345                    QueryPlan::Select(&show_columns_plan.select_plan),
346                    ExplainContext::None,
347                )
348            }
349            Plan::ExplainPlan(plan::ExplainPlanPlan {
350                stage,
351                format,
352                config,
353                explainee: Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc }),
354            }) => {
355                // Create OptimizerTrace to collect optimizer plans
356                let optimizer_trace = OptimizerTrace::new(stage.paths());
357                let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
358                    broken: *broken,
359                    config: config.clone(),
360                    format: *format,
361                    stage: *stage,
362                    replan: None,
363                    desc: Some(desc.clone()),
364                    optimizer_trace,
365                });
366                (QueryPlan::Select(plan), explain_ctx)
367            }
368            // COPY TO S3
369            Plan::CopyTo(plan::CopyToPlan {
370                select_plan,
371                desc,
372                to,
373                connection,
374                connection_id,
375                format,
376                max_file_size,
377            }) => {
378                let uri = eval_copy_to_uri(to.clone(), session, catalog.state())?;
379
380                // (output_batch_count will be set later)
381                let copy_to_ctx = CopyToContext {
382                    desc: desc.clone(),
383                    uri,
384                    connection: connection.clone(),
385                    connection_id: *connection_id,
386                    format: format.clone(),
387                    max_file_size: *max_file_size,
388                    output_batch_count: None,
389                };
390
391                (
392                    QueryPlan::CopyTo(select_plan, copy_to_ctx),
393                    ExplainContext::None,
394                )
395            }
396            Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => {
397                // Only handle EXPLAIN FILTER PUSHDOWN for SELECT statements
398                match explainee {
399                    plan::Explainee::Statement(plan::ExplaineeStatement::Select {
400                        broken: false,
401                        plan,
402                        desc: _,
403                    }) => {
404                        let explain_ctx = ExplainContext::Pushdown;
405                        (QueryPlan::Select(plan), explain_ctx)
406                    }
407                    _ => {
408                        // This shouldn't happen because we already checked for this at the AST
409                        // level before calling `try_frontend_peek_inner`.
410                        soft_panic_or_log!(
411                            "unexpected EXPLAIN FILTER PUSHDOWN plan kind in frontend peek sequencing: {:?}",
412                            explainee
413                        );
414                        debug!(
415                            "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is EXPLAIN BROKEN"
416                        );
417                        return Ok(None);
418                    }
419                }
420            }
421            Plan::SideEffectingFunc(sef_plan) => {
422                // Side-effecting functions need Coordinator state (e.g., active_conns),
423                // so delegate to the Coordinator via a Command.
424                // The RBAC check is performed in the Coordinator where active_conns is available.
425                let response = self
426                    .call_coordinator(|tx| Command::ExecuteSideEffectingFunc {
427                        plan: sef_plan.clone(),
428                        conn_id: session.conn_id().clone(),
429                        current_role: session.role_metadata().current_role,
430                        tx,
431                    })
432                    .await?;
433                return Ok(Some(response));
434            }
435            Plan::Subscribe(subscribe) => (QueryPlan::Subscribe(subscribe), ExplainContext::None),
436            _ => {
437                // This shouldn't happen because we already checked for this at the AST
438                // level before calling `try_frontend_peek_inner`.
439                soft_panic_or_log!(
440                    "Unexpected plan kind in frontend peek sequencing: {:?}",
441                    plan
442                );
443                debug!(
444                    "Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, side-effecting SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO S3"
445                );
446                return Ok(None);
447            }
448        };
449
450        let when = match query_plan {
451            QueryPlan::Select(s) => &s.when,
452            QueryPlan::CopyTo(s, _) => &s.when,
453            QueryPlan::Subscribe(s) => &s.when,
454        };
455
456        let depends_on = match query_plan {
457            QueryPlan::Select(s) => s.source.depends_on(),
458            QueryPlan::CopyTo(s, _) => s.source.depends_on(),
459            QueryPlan::Subscribe(s) => s.from.depends_on(),
460        };
461
462        let contains_temporal = match query_plan {
463            QueryPlan::Select(s) => s.source.contains_temporal(),
464            QueryPlan::CopyTo(s, _) => s.source.contains_temporal(),
465            QueryPlan::Subscribe(s) => Ok(s.from.contains_temporal()),
466        };
467
468        // # From sequence_plan
469
470        // We have checked the plan kind above.
471        assert!(plan.allowed_in_read_only());
472
473        let (cluster, target_cluster_id, target_cluster_name) = {
474            let target_cluster = match session.transaction().cluster() {
475                // Use the current transaction's cluster.
476                Some(cluster_id) => TargetCluster::Transaction(cluster_id),
477                // If there isn't a current cluster set for a transaction, then try to auto route.
478                None => coord::catalog_serving::auto_run_on_catalog_server(
479                    &conn_catalog,
480                    session,
481                    &plan,
482                ),
483            };
484            let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
485            (cluster, cluster.id, &cluster.name)
486        };
487
488        // Log cluster selection
489        if let Some(logging_id) = &statement_logging_id {
490            self.log_set_cluster(*logging_id, target_cluster_id);
491        }
492
493        coord::catalog_serving::check_cluster_restrictions(
494            target_cluster_name.as_str(),
495            &conn_catalog,
496            &plan,
497        )?;
498
499        rbac::check_plan(
500            &conn_catalog,
501            // We can't look at `active_conns` here, but that's ok, because this case was handled
502            // above already inside `Command::ExecuteSideEffectingFunc`.
503            None::<fn(u32) -> Option<RoleId>>,
504            session,
505            &plan,
506            Some(target_cluster_id),
507            &resolved_ids,
508        )?;
509
510        if let Some((_, wait_future)) =
511            coord::appends::waiting_on_startup_appends(&*catalog, session, &plan)
512        {
513            wait_future.await;
514        }
515
516        let max_query_result_size = Some(session.vars().max_query_result_size());
517
518        // # From sequence_peek
519
520        // # From peek_validate
521
522        let compute_instance_snapshot =
523            ComputeInstanceSnapshot::new_without_collections(cluster.id());
524
525        let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config())
526            .override_from(&catalog.get_cluster(cluster.id()).config.features())
527            .override_from(&explain_ctx);
528
529        if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
530            return Err(AdapterError::NoClusterReplicasAvailable {
531                name: cluster.name.clone(),
532                is_managed: cluster.is_managed(),
533            });
534        }
535
536        let (_, view_id) = self.transient_id_gen.allocate_id();
537        let (_, index_id) = self.transient_id_gen.allocate_id();
538
539        let target_replica_name = session.vars().cluster_replica();
540        let mut target_replica = target_replica_name
541            .map(|name| {
542                cluster
543                    .replica_id(name)
544                    .ok_or(AdapterError::UnknownClusterReplica {
545                        cluster_name: cluster.name.clone(),
546                        replica_name: name.to_string(),
547                    })
548            })
549            .transpose()?;
550
551        let source_ids = depends_on;
552        // TODO(peek-seq): validate_timeline_context can be expensive in real scenarios (not in
553        // simple benchmarks), because it traverses transitive dependencies even of indexed views and
554        // materialized views (also traversing their MIR plans).
555        let mut timeline_context = catalog.validate_timeline_context(source_ids.iter().copied())?;
556        if matches!(timeline_context, TimelineContext::TimestampIndependent) && contains_temporal? {
557            // If the source IDs are timestamp independent but the query contains temporal functions,
558            // then the timeline context needs to be upgraded to timestamp dependent. This is
559            // required because `source_ids` doesn't contain functions.
560            timeline_context = TimelineContext::TimestampDependent;
561        }
562
563        let notices = coord::sequencer::check_log_reads(
564            &catalog,
565            cluster,
566            &source_ids,
567            &mut target_replica,
568            session.vars(),
569        )?;
570        session.add_notices(notices);
571
572        // # From peek_linearize_timestamp
573
574        let isolation_level = session.vars().transaction_isolation().clone();
575        let timeline = Coordinator::get_timeline(&timeline_context);
576        let needs_linearized_read_ts =
577            Coordinator::needs_linearized_read_ts(&isolation_level, when);
578
579        let oracle_read_ts = match timeline {
580            Some(timeline) if needs_linearized_read_ts => {
581                let oracle = self.ensure_oracle(timeline).await?;
582                let oracle_read_ts = oracle.read_ts().await;
583                Some(oracle_read_ts)
584            }
585            Some(_) | None => None,
586        };
587
588        // # From peek_real_time_recency
589
590        let vars = session.vars();
591        let real_time_recency_ts: Option<Timestamp> = if vars.real_time_recency()
592            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
593            && !session.contains_read_timestamp()
594        {
595            // Only call the coordinator when we actually need real-time recency
596            self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp {
597                source_ids: source_ids.clone(),
598                real_time_recency_timeout: *vars.real_time_recency_timeout(),
599                tx,
600            })
601            .await?
602        } else {
603            None
604        };
605
606        // # From peek_timestamp_read_hold
607
608        let dataflow_builder =
609            DataflowBuilder::new(catalog.state(), compute_instance_snapshot.clone());
610        let input_id_bundle = dataflow_builder.sufficient_collections(source_ids.clone());
611
612        // ## From sequence_peek_timestamp
613
614        // Warning: This will be false for AS OF queries, even if we are otherwise inside a
615        // multi-statement transaction. (It's also false for FreshestTableWrite, which is currently
616        // only read-then-write queries, which can't be part of multi-statement transactions, so
617        // FreshestTableWrite doesn't matter.)
618        //
619        // TODO(peek-seq): It's not totally clear to me what the intended semantics are for AS OF
620        // queries inside a transaction: We clearly can't use the transaction timestamp, but the old
621        // peek sequencing still does a timedomain validation. The new peek sequencing does not do
622        // timedomain validation for AS OF queries, which seems more natural. But I'm thinking that
623        // it would be the cleanest to just simply disallow AS OF queries inside transactions.
624        let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when)
625            && !matches!(query_plan, QueryPlan::Subscribe { .. });
626
627        // Fetch or generate a timestamp for this query and fetch or acquire read holds.
628        let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
629            // Use the transaction's timestamp if it exists and this isn't an AS OF query.
630            // (`in_immediate_multi_stmt_txn` is false for AS OF queries.)
631            Some(
632                determination @ TimestampDetermination {
633                    timestamp_context: TimestampContext::TimelineTimestamp { .. },
634                    ..
635                },
636            ) if in_immediate_multi_stmt_txn => {
637                // This is a subsequent (non-AS OF, non-constant) query in a multi-statement
638                // transaction. We now:
639                // - Validate that the query only accesses collections within the transaction's
640                //   timedomain (which we know from the stored read holds).
641                // - Use the transaction's stored timestamp determination.
642                // - Use the (relevant subset of the) transaction's read holds.
643
644                let txn_read_holds_opt = self
645                    .call_coordinator(|tx| Command::GetTransactionReadHoldsBundle {
646                        conn_id: session.conn_id().clone(),
647                        tx,
648                    })
649                    .await;
650
651                if let Some(txn_read_holds) = txn_read_holds_opt {
652                    let allowed_id_bundle = txn_read_holds.id_bundle();
653                    let outside = input_id_bundle.difference(&allowed_id_bundle);
654
655                    // Queries without a timestamp and timeline can belong to any existing timedomain.
656                    if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
657                        let valid_names =
658                            allowed_id_bundle.resolve_names(&*catalog, session.conn_id());
659                        let invalid_names = outside.resolve_names(&*catalog, session.conn_id());
660                        return Err(AdapterError::RelationOutsideTimeDomain {
661                            relations: invalid_names,
662                            names: valid_names,
663                        });
664                    }
665
666                    // Extract the subset of read holds for the collections this query accesses.
667                    let read_holds = txn_read_holds.subset(&input_id_bundle);
668
669                    (determination, read_holds)
670                } else {
671                    // This should never happen: we're in a subsequent query of a multi-statement
672                    // transaction (we have a transaction timestamp), but the coordinator has no
673                    // transaction read holds stored. This indicates a bug in the transaction
674                    // handling.
675                    return Err(AdapterError::Internal(
676                        "Missing transaction read holds for multi-statement transaction"
677                            .to_string(),
678                    ));
679                }
680            }
681            _ => {
682                // There is no timestamp determination yet for this transaction. Either:
683                // - We are not in a multi-statement transaction.
684                // - This is the first (non-AS OF) query in a multi-statement transaction.
685                // - This is an AS OF query.
686                // - This is a constant query (`TimestampContext::NoTimestamp`).
687
688                let timedomain_bundle;
689                let determine_bundle = if in_immediate_multi_stmt_txn {
690                    // This is the first (non-AS OF) query in a multi-statement transaction.
691                    // Determine a timestamp that will be valid for anything in any schema
692                    // referenced by the first query.
693                    timedomain_bundle = timedomain_for(
694                        &*catalog,
695                        &dataflow_builder,
696                        &source_ids,
697                        &timeline_context,
698                        session.conn_id(),
699                        target_cluster_id,
700                    )?;
701                    &timedomain_bundle
702                } else {
703                    // Simply use the inputs of the current query.
704                    &input_id_bundle
705                };
706                let (determination, read_holds) = self
707                    .frontend_determine_timestamp(
708                        session,
709                        determine_bundle,
710                        when,
711                        target_cluster_id,
712                        &timeline_context,
713                        oracle_read_ts,
714                        real_time_recency_ts,
715                    )
716                    .await?;
717
718                // If this is the first (non-AS OF) query in a multi-statement transaction, store
719                // the read holds in the coordinator, so subsequent queries can validate against
720                // them.
721                if in_immediate_multi_stmt_txn {
722                    self.call_coordinator(|tx| Command::StoreTransactionReadHolds {
723                        conn_id: session.conn_id().clone(),
724                        read_holds: read_holds.clone(),
725                        tx,
726                    })
727                    .await;
728                }
729
730                (determination, read_holds)
731            }
732        };
733
734        {
735            // Assert that we have a read hold for all the collections in our `input_id_bundle`.
736            for id in input_id_bundle.iter() {
737                let s = read_holds.storage_holds.contains_key(&id);
738                let c = read_holds
739                    .compute_ids()
740                    .map(|(_instance, coll)| coll)
741                    .contains(&id);
742                soft_assert_or_log!(
743                    s || c,
744                    "missing read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
745                    id,
746                    in_immediate_multi_stmt_txn,
747                );
748            }
749
750            // Assert that each part of the `input_id_bundle` corresponds to the right part of
751            // `read_holds`.
752            for id in input_id_bundle.storage_ids.iter() {
753                soft_assert_or_log!(
754                    read_holds.storage_holds.contains_key(id),
755                    "missing storage read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
756                    id,
757                    in_immediate_multi_stmt_txn,
758                );
759            }
760            for id in input_id_bundle
761                .compute_ids
762                .iter()
763                .flat_map(|(_instance, colls)| colls)
764            {
765                soft_assert_or_log!(
766                    read_holds
767                        .compute_ids()
768                        .map(|(_instance, coll)| coll)
769                        .contains(id),
770                    "missing compute read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
771                    id,
772                    in_immediate_multi_stmt_txn,
773                );
774            }
775        }
776
777        // (TODO(peek-seq): The below TODO is copied from the old peek sequencing. We should resolve
778        // this when we decide what to with `AS OF` in transactions.)
779        // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
780        // arbitrary and we don't recall why we did it (possibly an error!). Change this to always
781        // set the transaction ops. Decide and document what our policy should be on AS OF queries.
782        // Maybe they shouldn't be allowed in transactions at all because it's hard to explain
783        // what's going on there. This should probably get a small design document.
784
785        // We only track the peeks in the session if the query doesn't use AS
786        // OF or we're inside an explicit transaction. The latter case is
787        // necessary to support PG's `BEGIN` semantics, whose behavior can
788        // depend on whether or not reads have occurred in the txn.
789        let requires_linearization = (&explain_ctx).into();
790        let mut transaction_determination = determination.clone();
791        match query_plan {
792            QueryPlan::Subscribe { .. } => {
793                if when.is_transactional() {
794                    session.add_transaction_ops(TransactionOps::Subscribe)?;
795                }
796            }
797            QueryPlan::Select(..) | QueryPlan::CopyTo(..) => {
798                if when.is_transactional() {
799                    session.add_transaction_ops(TransactionOps::Peeks {
800                        determination: transaction_determination,
801                        cluster_id: target_cluster_id,
802                        requires_linearization,
803                    })?;
804                } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
805                    // If the query uses AS OF, then ignore the timestamp.
806                    transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
807                    session.add_transaction_ops(TransactionOps::Peeks {
808                        determination: transaction_determination,
809                        cluster_id: target_cluster_id,
810                        requires_linearization,
811                    })?;
812                }
813            }
814        }
815
816        // # From peek_optimize
817
818        let stats = statistics_oracle(
819            session,
820            &source_ids,
821            &determination.timestamp_context.antichain(),
822            true,
823            catalog.system_config(),
824            &*self.storage_collections,
825        )
826        .await
827        .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
828
829        // Generate data structures that can be moved to another task where we will perform possibly
830        // expensive optimizations.
831        let timestamp_context = determination.timestamp_context.clone();
832        let session_meta = session.meta();
833        let now = catalog.config().now.clone();
834        let target_cluster_name = target_cluster_name.clone();
835        let needs_plan_insights = explain_ctx.needs_plan_insights();
836        let determination_for_pushdown = if matches!(explain_ctx, ExplainContext::Pushdown) {
837            // This is a hairy data structure, so avoid this clone if we are not in
838            // EXPLAIN FILTER PUSHDOWN.
839            Some(determination.clone())
840        } else {
841            None
842        };
843
844        let span = Span::current();
845
846        // Prepare data for plan insights if needed
847        let catalog_for_insights = if needs_plan_insights {
848            Some(Arc::clone(&catalog))
849        } else {
850            None
851        };
852        let mut compute_instances = BTreeMap::new();
853        if needs_plan_insights {
854            for user_cluster in catalog.user_clusters() {
855                let snapshot = ComputeInstanceSnapshot::new_without_collections(user_cluster.id);
856                compute_instances.insert(user_cluster.name.clone(), snapshot);
857            }
858        }
859
860        let source_ids_for_closure = source_ids.clone();
861
862        let optimization_future: JoinHandle<Result<_, AdapterError>> = match query_plan {
863            QueryPlan::CopyTo(select_plan, mut copy_to_ctx) => {
864                let raw_expr = select_plan.source.clone();
865
866                // COPY TO path: calculate output_batch_count and create copy_to optimizer
867                let worker_counts = cluster.replicas().map(|r| {
868                    let loc = &r.config.location;
869                    loc.workers().unwrap_or_else(|| loc.num_processes())
870                });
871                let max_worker_count = match worker_counts.max() {
872                    Some(count) => u64::cast_from(count),
873                    None => {
874                        return Err(AdapterError::NoClusterReplicasAvailable {
875                            name: cluster.name.clone(),
876                            is_managed: cluster.is_managed(),
877                        });
878                    }
879                };
880                copy_to_ctx.output_batch_count = Some(max_worker_count);
881
882                let mut optimizer = optimize::copy_to::Optimizer::new(
883                    Arc::clone(&catalog),
884                    compute_instance_snapshot,
885                    view_id,
886                    copy_to_ctx,
887                    optimizer_config,
888                    self.optimizer_metrics.clone(),
889                );
890
891                mz_ore::task::spawn_blocking(
892                    || "optimize copy-to",
893                    move || {
894                        span.in_scope(|| {
895                            let _dispatch_guard = explain_ctx.dispatch_guard();
896
897                            // COPY TO path
898                            // HIR ⇒ MIR lowering and MIR optimization (local)
899                            let local_mir_plan =
900                                optimizer.catch_unwind_optimize(raw_expr.clone())?;
901                            // Attach resolved context required to continue the pipeline.
902                            let local_mir_plan = local_mir_plan.resolve(
903                                timestamp_context.clone(),
904                                &session_meta,
905                                stats,
906                            );
907                            // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
908                            let global_lir_plan =
909                                optimizer.catch_unwind_optimize(local_mir_plan)?;
910                            Ok(Execution::CopyToS3 {
911                                global_lir_plan,
912                                source_ids: source_ids_for_closure,
913                            })
914                        })
915                    },
916                )
917            }
918            QueryPlan::Select(select_plan) => {
919                let select_plan = select_plan.clone();
920                let raw_expr = select_plan.source.clone();
921
922                // SELECT/EXPLAIN path: create peek optimizer
923                let mut optimizer = optimize::peek::Optimizer::new(
924                    Arc::clone(&catalog),
925                    compute_instance_snapshot,
926                    select_plan.finishing.clone(),
927                    view_id,
928                    index_id,
929                    optimizer_config,
930                    self.optimizer_metrics.clone(),
931                );
932
933                mz_ore::task::spawn_blocking(
934                    || "optimize peek",
935                    move || {
936                        span.in_scope(|| {
937                            let _dispatch_guard = explain_ctx.dispatch_guard();
938
939                            // SELECT/EXPLAIN path
940                            // HIR ⇒ MIR lowering and MIR optimization (local)
941
942                            // The purpose of wrapping the following in a closure is to control where the
943                            // `?`s return from, so that even when a `catch_unwind_optimize` call fails,
944                            // we can still handle `EXPLAIN BROKEN`.
945                            let pipeline = || {
946                                let local_mir_plan =
947                                    optimizer.catch_unwind_optimize(raw_expr.clone())?;
948                                // Attach resolved context required to continue the pipeline.
949                                let local_mir_plan = local_mir_plan.resolve(
950                                    timestamp_context.clone(),
951                                    &session_meta,
952                                    stats,
953                                );
954                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
955                                let global_lir_plan =
956                                    optimizer.catch_unwind_optimize(local_mir_plan)?;
957                                Ok::<_, AdapterError>(global_lir_plan)
958                            };
959
960                            let global_lir_plan_result = pipeline();
961                            let optimization_finished_at = now();
962
963                            let create_insights_ctx =
964                                |optimizer: &optimize::peek::Optimizer,
965                                 is_notice: bool|
966                                 -> Option<Box<PlanInsightsContext>> {
967                                    if !needs_plan_insights {
968                                        return None;
969                                    }
970
971                                    let catalog = catalog_for_insights.as_ref()?;
972
973                                    let enable_re_optimize = if needs_plan_insights {
974                                        // Disable any plan insights that use the optimizer if we only want the
975                                        // notice and plan optimization took longer than the threshold. This is
976                                        // to prevent a situation where optimizing takes a while and there are
977                                        // lots of clusters, which would delay peek execution by the product of
978                                        // those.
979                                        //
980                                        // (This heuristic doesn't work well, see #9492.)
981                                        let dyncfgs = catalog.system_config().dyncfgs();
982                                        let opt_limit = mz_adapter_types::dyncfgs
983                                        ::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
984                                            .get(dyncfgs);
985                                        !(is_notice && optimizer.duration() > opt_limit)
986                                    } else {
987                                        false
988                                    };
989
990                                    Some(Box::new(PlanInsightsContext {
991                                        stmt: select_plan
992                                            .select
993                                            .as_deref()
994                                            .map(Clone::clone)
995                                            .map(Statement::Select),
996                                        raw_expr: raw_expr.clone(),
997                                        catalog: Arc::clone(catalog),
998                                        compute_instances,
999                                        target_instance: target_cluster_name,
1000                                        metrics: optimizer.metrics().clone(),
1001                                        finishing: optimizer.finishing().clone(),
1002                                        optimizer_config: optimizer.config().clone(),
1003                                        session: session_meta,
1004                                        timestamp_context,
1005                                        view_id: optimizer.select_id(),
1006                                        index_id: optimizer.index_id(),
1007                                        enable_re_optimize,
1008                                    }))
1009                                };
1010
1011                            let global_lir_plan = match global_lir_plan_result {
1012                                Ok(plan) => plan,
1013                                Err(err) => {
1014                                    let result = if let ExplainContext::Plan(explain_ctx) =
1015                                        explain_ctx
1016                                        && explain_ctx.broken
1017                                    {
1018                                        // EXPLAIN BROKEN: log error and continue with defaults
1019                                        tracing::error!(
1020                                            "error while handling EXPLAIN statement: {}",
1021                                            err
1022                                        );
1023                                        Ok(Execution::ExplainPlan {
1024                                            df_meta: Default::default(),
1025                                            explain_ctx,
1026                                            optimizer,
1027                                            insights_ctx: None,
1028                                        })
1029                                    } else {
1030                                        Err(err)
1031                                    };
1032                                    return result;
1033                                }
1034                            };
1035
1036                            match explain_ctx {
1037                                ExplainContext::Plan(explain_ctx) => {
1038                                    let (_, df_meta, _) = global_lir_plan.unapply();
1039                                    let insights_ctx = create_insights_ctx(&optimizer, false);
1040                                    Ok(Execution::ExplainPlan {
1041                                        df_meta,
1042                                        explain_ctx,
1043                                        optimizer,
1044                                        insights_ctx,
1045                                    })
1046                                }
1047                                ExplainContext::None => Ok(Execution::Peek {
1048                                    global_lir_plan,
1049                                    optimization_finished_at,
1050                                    plan_insights_optimizer_trace: None,
1051                                    finishing: select_plan.finishing,
1052                                    copy_to: select_plan.copy_to,
1053                                    insights_ctx: None,
1054                                }),
1055                                ExplainContext::PlanInsightsNotice(optimizer_trace) => {
1056                                    let insights_ctx = create_insights_ctx(&optimizer, true);
1057                                    Ok(Execution::Peek {
1058                                        global_lir_plan,
1059                                        optimization_finished_at,
1060                                        plan_insights_optimizer_trace: Some(optimizer_trace),
1061                                        finishing: select_plan.finishing,
1062                                        copy_to: select_plan.copy_to,
1063                                        insights_ctx,
1064                                    })
1065                                }
1066                                ExplainContext::Pushdown => {
1067                                    let (plan, _, _) = global_lir_plan.unapply();
1068                                    let imports = match plan {
1069                                        PeekPlan::SlowPath(plan) => plan
1070                                            .desc
1071                                            .source_imports
1072                                            .into_iter()
1073                                            .filter_map(|(id, import)| {
1074                                                import.desc.arguments.operators.map(|mfp| (id, mfp))
1075                                            })
1076                                            .collect(),
1077                                        PeekPlan::FastPath(_) => {
1078                                            std::collections::BTreeMap::default()
1079                                        }
1080                                    };
1081                                    Ok(Execution::ExplainPushdown {
1082                                        imports,
1083                                        determination: determination_for_pushdown
1084                                            .expect("it's present for the ExplainPushdown case"),
1085                                    })
1086                                }
1087                            }
1088                        })
1089                    },
1090                )
1091            }
1092            QueryPlan::Subscribe(plan) => {
1093                let plan = plan.clone();
1094                let catalog: Arc<Catalog> = Arc::clone(&catalog);
1095                let debug_name = format!("subscribe-{}", index_id);
1096                let mut optimizer = optimize::subscribe::Optimizer::new(
1097                    catalog,
1098                    compute_instance_snapshot.clone(),
1099                    view_id,
1100                    index_id,
1101                    plan.with_snapshot,
1102                    plan.up_to,
1103                    debug_name,
1104                    optimizer_config,
1105                    self.optimizer_metrics.clone(),
1106                );
1107                mz_ore::task::spawn_blocking(
1108                    || "optimize subscribe",
1109                    move || {
1110                        span.in_scope(|| {
1111                            let _dispatch_guard = explain_ctx.dispatch_guard();
1112
1113                            let global_mir_plan = optimizer.catch_unwind_optimize(plan.clone())?;
1114                            let as_of = timestamp_context.timestamp_or_default();
1115
1116                            if let Some(up_to) = optimizer.up_to() {
1117                                if as_of > up_to {
1118                                    return Err(AdapterError::AbsurdSubscribeBounds {
1119                                        as_of,
1120                                        up_to,
1121                                    });
1122                                }
1123                            }
1124                            let local_mir_plan =
1125                                global_mir_plan.resolve(Antichain::from_elem(as_of));
1126
1127                            let global_lir_plan =
1128                                optimizer.catch_unwind_optimize(local_mir_plan)?;
1129                            let optimization_finished_at = now();
1130
1131                            let (df_desc, df_meta) = global_lir_plan.unapply();
1132                            Ok(Execution::Subscribe {
1133                                subscribe_plan: plan,
1134                                df_desc,
1135                                df_meta,
1136                                optimization_finished_at,
1137                            })
1138                        })
1139                    },
1140                )
1141            }
1142        };
1143
1144        let mut optimization_timeout = *session.vars().statement_timeout();
1145        // Timeout of 0 is equivalent to "off", meaning we will wait "forever."
1146        if optimization_timeout == Duration::ZERO {
1147            optimization_timeout = Duration::MAX;
1148        }
1149        let optimization_result =
1150            // Note: spawn_blocking tasks cannot be cancelled, so on timeout we stop waiting but the
1151            // optimization task continues running in the background until completion. See
1152            // https://github.com/MaterializeInc/database-issues/issues/8644 for properly cancelling
1153            // optimizer runs.
1154            match tokio::time::timeout(optimization_timeout, optimization_future).await {
1155                Ok(Ok(result)) => result,
1156                Ok(Err(AdapterError::Optimizer(err))) => {
1157                    return Err(AdapterError::Internal(format!(
1158                        "internal error in optimizer: {}",
1159                        err
1160                    )));
1161                }
1162                Ok(Err(err)) => {
1163                    return Err(err);
1164                }
1165                Err(_elapsed) => {
1166                    warn!("optimize peek timed out after {:?}", optimization_timeout);
1167                    return Err(AdapterError::StatementTimeout);
1168                }
1169            };
1170
1171        // Log optimization finished
1172        if let Some(logging_id) = &statement_logging_id {
1173            self.log_lifecycle_event(*logging_id, StatementLifecycleEvent::OptimizationFinished);
1174        }
1175
1176        // Assert that read holds are correct for the execution plan
1177        Self::assert_read_holds_correct(
1178            &read_holds,
1179            &optimization_result,
1180            &determination,
1181            target_cluster_id,
1182            in_immediate_multi_stmt_txn,
1183        );
1184
1185        // Handle the optimization result: either generate EXPLAIN output or continue with execution
1186        match optimization_result {
1187            Execution::ExplainPlan {
1188                df_meta,
1189                explain_ctx,
1190                optimizer,
1191                insights_ctx,
1192            } => {
1193                let rows = coord::sequencer::explain_plan_inner(
1194                    session,
1195                    &catalog,
1196                    df_meta,
1197                    explain_ctx,
1198                    optimizer,
1199                    insights_ctx,
1200                )
1201                .await?;
1202
1203                Ok(Some(ExecuteResponse::SendingRowsImmediate {
1204                    rows: Box::new(rows.into_row_iter()),
1205                }))
1206            }
1207            Execution::ExplainPushdown {
1208                imports,
1209                determination,
1210            } => {
1211                // # From peek_explain_pushdown
1212
1213                let as_of = determination.timestamp_context.antichain();
1214                let mz_now = determination
1215                    .timestamp_context
1216                    .timestamp()
1217                    .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1218                    .unwrap_or_else(ResultSpec::value_all);
1219
1220                Ok(Some(
1221                    coord::sequencer::explain_pushdown_future_inner(
1222                        session,
1223                        &*catalog,
1224                        &self.storage_collections,
1225                        as_of,
1226                        mz_now,
1227                        imports,
1228                    )
1229                    .await
1230                    .await?,
1231                ))
1232            }
1233            Execution::Peek {
1234                global_lir_plan,
1235                optimization_finished_at: _optimization_finished_at,
1236                plan_insights_optimizer_trace,
1237                finishing,
1238                copy_to,
1239                insights_ctx,
1240            } => {
1241                // Continue with normal execution
1242                // # From peek_finish
1243
1244                // The typ here was generated from the HIR SQL type and simply stored in LIR.
1245                let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
1246
1247                coord::sequencer::emit_optimizer_notices(
1248                    &*catalog,
1249                    session,
1250                    &df_meta.optimizer_notices,
1251                );
1252
1253                // Generate plan insights notice if needed
1254                if let Some(trace) = plan_insights_optimizer_trace {
1255                    let target_cluster = catalog.get_cluster(target_cluster_id);
1256                    let features = OptimizerFeatures::from(catalog.system_config())
1257                        .override_from(&target_cluster.config.features());
1258                    let insights = trace
1259                        .into_plan_insights(
1260                            &features,
1261                            &catalog.for_session(session),
1262                            Some(finishing.clone()),
1263                            Some(target_cluster),
1264                            df_meta.clone(),
1265                            insights_ctx,
1266                        )
1267                        .await?;
1268                    session.add_notice(AdapterNotice::PlanInsights(insights));
1269                }
1270
1271                // # Now back to peek_finish
1272
1273                let watch_set = statement_logging_id.map(|logging_id| {
1274                    WatchSetCreation::new(
1275                        logging_id,
1276                        catalog.state(),
1277                        &input_id_bundle,
1278                        determination.timestamp_context.timestamp_or_default(),
1279                    )
1280                });
1281
1282                let max_result_size = catalog.system_config().max_result_size();
1283
1284                // Clone determination if we need it for emit_timestamp_notice, since it may be
1285                // moved into Command::ExecuteSlowPathPeek.
1286                let determination_for_notice = if session.vars().emit_timestamp_notice() {
1287                    Some(determination.clone())
1288                } else {
1289                    None
1290                };
1291
1292                let response = match peek_plan {
1293                    PeekPlan::FastPath(fast_path_plan) => {
1294                        if let Some(logging_id) = &statement_logging_id {
1295                            // TODO(peek-seq): Actually, we should log it also for
1296                            // FastPathPlan::Constant. The only reason we are not doing so at the
1297                            // moment is to match the old peek sequencing, so that statement logging
1298                            // tests pass with the frontend peek sequencing turned both on and off.
1299                            //
1300                            // When the old sequencing is removed, we should make a couple of
1301                            // changes in how we log timestamps:
1302                            // - Move this up to just after timestamp determination, so that it
1303                            //   appears in the log as soon as possible.
1304                            // - Do it also for Constant peeks.
1305                            // - Currently, slow-path peeks' timestamp logging is done by
1306                            //   `implement_peek_plan`. We could remove it from there, and just do
1307                            //   it here.
1308                            if !matches!(fast_path_plan, FastPathPlan::Constant(..)) {
1309                                self.log_set_timestamp(
1310                                    *logging_id,
1311                                    determination.timestamp_context.timestamp_or_default(),
1312                                );
1313                            }
1314                        }
1315
1316                        let row_set_finishing_seconds =
1317                            session.metrics().row_set_finishing_seconds().clone();
1318
1319                        let peek_stash_read_batch_size_bytes =
1320                            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
1321                                .get(catalog.system_config().dyncfgs());
1322                        let peek_stash_read_memory_budget_bytes =
1323                            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
1324                                .get(catalog.system_config().dyncfgs());
1325
1326                        self.implement_fast_path_peek_plan(
1327                            fast_path_plan,
1328                            determination.timestamp_context.timestamp_or_default(),
1329                            finishing,
1330                            target_cluster_id,
1331                            target_replica,
1332                            typ,
1333                            max_result_size,
1334                            max_query_result_size,
1335                            row_set_finishing_seconds,
1336                            read_holds,
1337                            peek_stash_read_batch_size_bytes,
1338                            peek_stash_read_memory_budget_bytes,
1339                            session.conn_id().clone(),
1340                            source_ids,
1341                            watch_set,
1342                        )
1343                        .await?
1344                    }
1345                    PeekPlan::SlowPath(dataflow_plan) => {
1346                        if let Some(logging_id) = &statement_logging_id {
1347                            self.log_set_transient_index_id(*logging_id, dataflow_plan.id);
1348                        }
1349
1350                        self.call_coordinator(|tx| Command::ExecuteSlowPathPeek {
1351                            dataflow_plan: Box::new(dataflow_plan),
1352                            determination,
1353                            finishing,
1354                            compute_instance: target_cluster_id,
1355                            target_replica,
1356                            intermediate_result_type: typ,
1357                            source_ids,
1358                            conn_id: session.conn_id().clone(),
1359                            max_result_size,
1360                            max_query_result_size,
1361                            watch_set,
1362                            tx,
1363                        })
1364                        .await?
1365                    }
1366                };
1367
1368                // Add timestamp notice if emit_timestamp_notice is enabled
1369                if let Some(determination) = determination_for_notice {
1370                    let explanation = self
1371                        .call_coordinator(|tx| Command::ExplainTimestamp {
1372                            conn_id: session.conn_id().clone(),
1373                            session_wall_time: session.pcx().wall_time,
1374                            cluster_id: target_cluster_id,
1375                            id_bundle: input_id_bundle.clone(),
1376                            determination,
1377                            tx,
1378                        })
1379                        .await;
1380                    session.add_notice(AdapterNotice::QueryTimestamp { explanation });
1381                }
1382
1383                Ok(Some(match copy_to {
1384                    None => response,
1385                    // COPY TO STDOUT
1386                    Some(format) => ExecuteResponse::CopyTo {
1387                        format,
1388                        resp: Box::new(response),
1389                    },
1390                }))
1391            }
1392            Execution::Subscribe {
1393                subscribe_plan,
1394                df_desc,
1395                df_meta,
1396                optimization_finished_at: _optimization_finished_at,
1397            } => {
1398                if df_desc.as_of.as_ref().expect("as of set") == &df_desc.until {
1399                    session.add_notice(AdapterNotice::EqualSubscribeBounds {
1400                        bound: *df_desc.until.as_option().expect("as of set"),
1401                    });
1402                }
1403                coord::sequencer::emit_optimizer_notices(
1404                    &*catalog,
1405                    session,
1406                    &df_meta.optimizer_notices,
1407                );
1408
1409                let response = self
1410                    .call_coordinator(|tx| Command::ExecuteSubscribe {
1411                        df_desc,
1412                        dependency_ids: subscribe_plan.from.depends_on(),
1413                        cluster_id: target_cluster_id,
1414                        replica_id: target_replica,
1415                        conn_id: session.conn_id().clone(),
1416                        session_uuid: session.uuid(),
1417                        read_holds,
1418                        plan: subscribe_plan,
1419                        statement_logging_id,
1420                        tx,
1421                    })
1422                    .await?;
1423                Ok(Some(response))
1424            }
1425            Execution::CopyToS3 {
1426                global_lir_plan,
1427                source_ids,
1428            } => {
1429                let (df_desc, df_meta) = global_lir_plan.unapply();
1430
1431                coord::sequencer::emit_optimizer_notices(
1432                    &*catalog,
1433                    session,
1434                    &df_meta.optimizer_notices,
1435                );
1436
1437                // Extract S3 sink connection info for preflight check
1438                let sink_id = df_desc.sink_id();
1439                let sinks = &df_desc.sink_exports;
1440                if sinks.len() != 1 {
1441                    return Err(AdapterError::Internal(
1442                        "expected exactly one copy to s3 sink".into(),
1443                    ));
1444                }
1445                let (_, sink_desc) = sinks
1446                    .first_key_value()
1447                    .expect("known to be exactly one copy to s3 sink");
1448                let s3_sink_connection = match &sink_desc.connection {
1449                    mz_compute_types::sinks::ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1450                        conn.clone()
1451                    }
1452                    _ => {
1453                        return Err(AdapterError::Internal(
1454                            "expected copy to s3 oneshot sink".into(),
1455                        ));
1456                    }
1457                };
1458
1459                // Perform S3 preflight check in background task (via coordinator).
1460                // This runs slow S3 operations without blocking the coordinator's main task.
1461                self.call_coordinator(|tx| Command::CopyToPreflight {
1462                    s3_sink_connection,
1463                    sink_id,
1464                    tx,
1465                })
1466                .await?;
1467
1468                // Preflight succeeded, now execute the actual COPY TO dataflow
1469                let watch_set = statement_logging_id.map(|logging_id| {
1470                    WatchSetCreation::new(
1471                        logging_id,
1472                        catalog.state(),
1473                        &input_id_bundle,
1474                        determination.timestamp_context.timestamp_or_default(),
1475                    )
1476                });
1477
1478                let response = self
1479                    .call_coordinator(|tx| Command::ExecuteCopyTo {
1480                        df_desc: Box::new(df_desc),
1481                        compute_instance: target_cluster_id,
1482                        target_replica,
1483                        source_ids,
1484                        conn_id: session.conn_id().clone(),
1485                        watch_set,
1486                        tx,
1487                    })
1488                    .await?;
1489
1490                Ok(Some(response))
1491            }
1492        }
1493    }
1494
1495    /// (Similar to Coordinator::determine_timestamp)
1496    /// Determines the timestamp for a query, acquires read holds that ensure the
1497    /// query remains executable at that time, and returns those.
1498    /// The caller is responsible for eventually dropping those read holds.
1499    ///
1500    /// Note: self is taken &mut because of the lazy fetching in `get_compute_instance_client`.
1501    pub(crate) async fn frontend_determine_timestamp(
1502        &mut self,
1503        session: &Session,
1504        id_bundle: &CollectionIdBundle,
1505        when: &QueryWhen,
1506        compute_instance: ComputeInstanceId,
1507        timeline_context: &TimelineContext,
1508        oracle_read_ts: Option<Timestamp>,
1509        real_time_recency_ts: Option<Timestamp>,
1510    ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> {
1511        // this is copy-pasted from Coordinator
1512
1513        let isolation_level = session.vars().transaction_isolation();
1514
1515        let (read_holds, upper) = self
1516            .acquire_read_holds_and_least_valid_write(id_bundle)
1517            .await
1518            .map_err(|err| {
1519                AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1520                    err,
1521                    compute_instance,
1522                )
1523            })?;
1524        let (det, read_holds) = <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1525            session,
1526            id_bundle,
1527            when,
1528            timeline_context,
1529            oracle_read_ts,
1530            real_time_recency_ts,
1531            isolation_level,
1532            read_holds,
1533            upper.clone(),
1534        )?;
1535
1536        session
1537            .metrics()
1538            .determine_timestamp(&[
1539                match det.respond_immediately() {
1540                    true => "true",
1541                    false => "false",
1542                },
1543                isolation_level.as_str(),
1544                &compute_instance.to_string(),
1545            ])
1546            .inc();
1547        if !det.respond_immediately()
1548            && isolation_level == &IsolationLevel::StrictSerializable
1549            && real_time_recency_ts.is_none()
1550        {
1551            // Note down the difference between StrictSerializable and Serializable into a metric.
1552            if let Some(strict) = det.timestamp_context.timestamp() {
1553                let (serializable_det, _tmp_read_holds) =
1554                    <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1555                        session,
1556                        id_bundle,
1557                        when,
1558                        timeline_context,
1559                        oracle_read_ts,
1560                        real_time_recency_ts,
1561                        &IsolationLevel::Serializable,
1562                        read_holds.clone(),
1563                        upper,
1564                    )?;
1565                if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
1566                    session
1567                        .metrics()
1568                        .timestamp_difference_for_strict_serializable_ms(&[compute_instance
1569                            .to_string()
1570                            .as_ref()])
1571                        .observe(f64::cast_lossy(u64::from(
1572                            strict.saturating_sub(*serializable),
1573                        )));
1574                }
1575            }
1576        }
1577
1578        Ok((det, read_holds))
1579    }
1580
1581    fn assert_read_holds_correct(
1582        read_holds: &ReadHolds,
1583        execution: &Execution,
1584        determination: &TimestampDetermination,
1585        target_cluster_id: ClusterId,
1586        in_immediate_multi_stmt_txn: bool,
1587    ) {
1588        // Extract source_imports, index_imports, as_of, and execution_name based on Execution variant
1589        let (source_imports, index_imports, as_of, execution_name): (
1590            Vec<GlobalId>,
1591            Vec<GlobalId>,
1592            Timestamp,
1593            &str,
1594        ) = match execution {
1595            Execution::Peek {
1596                global_lir_plan, ..
1597            } => match global_lir_plan.peek_plan() {
1598                PeekPlan::FastPath(fast_path_plan) => {
1599                    let (sources, indexes) = match fast_path_plan {
1600                        FastPathPlan::Constant(..) => (vec![], vec![]),
1601                        FastPathPlan::PeekExisting(_coll_id, idx_id, ..) => (vec![], vec![*idx_id]),
1602                        FastPathPlan::PeekPersist(global_id, ..) => (vec![*global_id], vec![]),
1603                    };
1604                    (
1605                        sources,
1606                        indexes,
1607                        determination.timestamp_context.timestamp_or_default(),
1608                        "FastPath",
1609                    )
1610                }
1611                PeekPlan::SlowPath(dataflow_plan) => {
1612                    let as_of = dataflow_plan
1613                        .desc
1614                        .as_of
1615                        .clone()
1616                        .expect("dataflow has an as_of")
1617                        .into_element();
1618                    (
1619                        dataflow_plan.desc.source_imports.keys().cloned().collect(),
1620                        dataflow_plan.desc.index_imports.keys().cloned().collect(),
1621                        as_of,
1622                        "SlowPath",
1623                    )
1624                }
1625            },
1626            Execution::CopyToS3 {
1627                global_lir_plan, ..
1628            } => {
1629                let df_desc = global_lir_plan.df_desc();
1630                let as_of = df_desc
1631                    .as_of
1632                    .clone()
1633                    .expect("dataflow has an as_of")
1634                    .into_element();
1635                (
1636                    df_desc.source_imports.keys().cloned().collect(),
1637                    df_desc.index_imports.keys().cloned().collect(),
1638                    as_of,
1639                    "CopyToS3",
1640                )
1641            }
1642            Execution::ExplainPlan { .. } | Execution::ExplainPushdown { .. } => {
1643                // No read holds assertions needed for EXPLAIN variants
1644                return;
1645            }
1646            Execution::Subscribe { df_desc, .. } => {
1647                let as_of = df_desc
1648                    .as_of
1649                    .clone()
1650                    .expect("dataflow has an as_of")
1651                    .into_element();
1652                (
1653                    df_desc.source_imports.keys().cloned().collect(),
1654                    df_desc.index_imports.keys().cloned().collect(),
1655                    as_of,
1656                    "Subscribe",
1657                )
1658            }
1659        };
1660
1661        // Assert that we have some read holds for all the imports of the dataflow.
1662        for id in source_imports.iter() {
1663            soft_assert_or_log!(
1664                read_holds.storage_holds.contains_key(id),
1665                "[{}] missing read hold for the source import {}; (in_immediate_multi_stmt_txn: {})",
1666                execution_name,
1667                id,
1668                in_immediate_multi_stmt_txn,
1669            );
1670        }
1671        for id in index_imports.iter() {
1672            soft_assert_or_log!(
1673                read_holds
1674                    .compute_ids()
1675                    .map(|(_instance, coll)| coll)
1676                    .contains(id),
1677                "[{}] missing read hold for the index import {}; (in_immediate_multi_stmt_txn: {})",
1678                execution_name,
1679                id,
1680                in_immediate_multi_stmt_txn,
1681            );
1682        }
1683
1684        // Also check the holds against the as_of.
1685        for (id, h) in read_holds.storage_holds.iter() {
1686            soft_assert_or_log!(
1687                h.since().less_equal(&as_of),
1688                "[{}] storage read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1689                execution_name,
1690                h.since(),
1691                id,
1692                as_of,
1693                determination,
1694                in_immediate_multi_stmt_txn,
1695            );
1696        }
1697        for ((instance, id), h) in read_holds.compute_holds.iter() {
1698            soft_assert_eq_or_log!(
1699                *instance,
1700                target_cluster_id,
1701                "[{}] the read hold on {} is on the wrong cluster; (in_immediate_multi_stmt_txn: {})",
1702                execution_name,
1703                id,
1704                in_immediate_multi_stmt_txn,
1705            );
1706            soft_assert_or_log!(
1707                h.since().less_equal(&as_of),
1708                "[{}] compute read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1709                execution_name,
1710                h.since(),
1711                id,
1712                as_of,
1713                determination,
1714                in_immediate_multi_stmt_txn,
1715            );
1716        }
1717    }
1718}
1719
1720/// Enum for branching among various execution steps after optimization
1721enum Execution {
1722    Peek {
1723        global_lir_plan: optimize::peek::GlobalLirPlan,
1724        optimization_finished_at: EpochMillis,
1725        plan_insights_optimizer_trace: Option<OptimizerTrace>,
1726        finishing: RowSetFinishing,
1727        copy_to: Option<plan::CopyFormat>,
1728        insights_ctx: Option<Box<PlanInsightsContext>>,
1729    },
1730    Subscribe {
1731        subscribe_plan: SubscribePlan,
1732        df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1733        df_meta: DataflowMetainfo,
1734        optimization_finished_at: EpochMillis,
1735    },
1736    CopyToS3 {
1737        global_lir_plan: optimize::copy_to::GlobalLirPlan,
1738        source_ids: BTreeSet<GlobalId>,
1739    },
1740    ExplainPlan {
1741        df_meta: DataflowMetainfo,
1742        explain_ctx: ExplainPlanContext,
1743        optimizer: optimize::peek::Optimizer,
1744        insights_ctx: Option<Box<PlanInsightsContext>>,
1745    },
1746    ExplainPushdown {
1747        imports: BTreeMap<GlobalId, mz_expr::MapFilterProject>,
1748        determination: TimestampDetermination,
1749    },
1750}