Skip to main content

mz_adapter/
frontend_peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::sync::Arc;
13
14use itertools::{Either, Itertools};
15use mz_adapter_types::dyncfgs::CONSTRAINT_BASED_TIMESTAMP_SELECTION;
16use mz_adapter_types::timestamp_selection::ConstraintBasedTimestampSelection;
17use mz_compute_types::ComputeInstanceId;
18use mz_controller_types::ClusterId;
19use mz_expr::{CollectionPlan, ResultSpec};
20use mz_ore::cast::{CastFrom, CastLossy};
21use mz_ore::collections::CollectionExt;
22use mz_ore::now::EpochMillis;
23use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
24use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
25use mz_repr::role_id::RoleId;
26use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
27use mz_sql::ast::Raw;
28use mz_sql::catalog::CatalogCluster;
29use mz_sql::plan::Params;
30use mz_sql::plan::{self, Plan, QueryWhen};
31use mz_sql::rbac;
32use mz_sql::session::metadata::SessionMetadata;
33use mz_sql::session::vars::IsolationLevel;
34use mz_sql_parser::ast::{CopyDirection, CopyRelation, ExplainStage, ShowStatement, Statement};
35use mz_transform::EmptyStatisticsOracle;
36use mz_transform::dataflow::DataflowMetainfo;
37use opentelemetry::trace::TraceContextExt;
38use tracing::{Span, debug, warn};
39use tracing_opentelemetry::OpenTelemetrySpanExt;
40
41use crate::catalog::{Catalog, CatalogState};
42use crate::command::Command;
43use crate::coord::peek::{FastPathPlan, PeekPlan};
44use crate::coord::sequencer::{eval_copy_to_uri, statistics_oracle};
45use crate::coord::timeline::timedomain_for;
46use crate::coord::timestamp_selection::TimestampDetermination;
47use crate::coord::{
48    Coordinator, CopyToContext, ExecuteContextGuard, ExplainContext, ExplainPlanContext,
49    TargetCluster,
50};
51use crate::explain::insights::PlanInsightsContext;
52use crate::explain::optimizer_trace::OptimizerTrace;
53use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
54use crate::optimize::{Optimize, OptimizerError};
55use crate::session::{Session, TransactionOps, TransactionStatus};
56use crate::statement_logging::WatchSetCreation;
57use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
58use crate::{
59    AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, PeekClient, ReadHolds,
60    TimelineContext, TimestampContext, TimestampProvider, optimize,
61};
62use crate::{coord, metrics};
63
64impl PeekClient {
65    /// Attempt to sequence a peek from the session task.
66    ///
67    /// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the
68    /// Coordinator's sequencing. If it returns an error, it should be returned to the user.
69    ///
70    /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
71    /// triggering the execution of the underlying query.
72    pub(crate) async fn try_frontend_peek(
73        &mut self,
74        portal_name: &str,
75        session: &mut Session,
76        outer_ctx_extra: &mut Option<ExecuteContextGuard>,
77    ) -> Result<Option<ExecuteResponse>, AdapterError> {
78        // # From handle_execute
79
80        if session.vars().emit_trace_id_notice() {
81            let span_context = tracing::Span::current()
82                .context()
83                .span()
84                .span_context()
85                .clone();
86            if span_context.is_valid() {
87                session.add_notice(AdapterNotice::QueryTrace {
88                    trace_id: span_context.trace_id(),
89                });
90            }
91        }
92
93        // TODO(peek-seq): This snapshot is wasted when we end up bailing out from the frontend peek
94        // sequencing. We could solve this is with that optimization where we
95        // continuously keep a catalog snapshot in the session, and only get a new one when the
96        // catalog revision has changed, which we could see with an atomic read.
97        // But anyhow, this problem will just go away when we reach the point that we never fall
98        // back to the old sequencing.
99        let catalog = self.catalog_snapshot("try_frontend_peek").await;
100
101        // Extract things from the portal.
102        let (stmt, params, logging, lifecycle_timestamps) = {
103            if let Err(err) = Coordinator::verify_portal(&*catalog, session, portal_name) {
104                outer_ctx_extra
105                    .take()
106                    .and_then(|guard| guard.defuse().retire());
107                return Err(err);
108            }
109            let portal = session
110                .get_portal_unverified(portal_name)
111                // The portal is a session-level thing, so it couldn't have concurrently disappeared
112                // since the above verification.
113                .expect("called verify_portal above");
114            let params = portal.parameters.clone();
115            let stmt = portal.stmt.clone();
116            let logging = Arc::clone(&portal.logging);
117            let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
118            (stmt, params, logging, lifecycle_timestamps)
119        };
120
121        // Before planning, check if this is a statement type we can handle.
122        // This must happen BEFORE statement logging setup to avoid orphaned execution records.
123        if let Some(ref stmt) = stmt {
124            match &**stmt {
125                Statement::Select(_)
126                | Statement::ExplainAnalyzeObject(_)
127                | Statement::ExplainAnalyzeCluster(_)
128                | Statement::Show(ShowStatement::ShowObjects(_))
129                | Statement::Show(ShowStatement::ShowColumns(_)) => {
130                    // These are always fine, just continue.
131                    // Note: EXPLAIN ANALYZE will `plan` to `Plan::Select`.
132                    // Note: ShowObjects plans to `Plan::Select`, ShowColumns plans to `Plan::ShowColumns`.
133                    // We handle `Plan::ShowColumns` specially in `try_frontend_peek_inner`.
134                }
135                Statement::ExplainPlan(explain_stmt) => {
136                    // Only handle ExplainPlan for SELECT statements.
137                    // We don't want to handle e.g. EXPLAIN CREATE MATERIALIZED VIEW here, because that
138                    // requires purification before planning, which the frontend peek sequencing doesn't
139                    // do.
140                    match &explain_stmt.explainee {
141                        mz_sql_parser::ast::Explainee::Select(..) => {
142                            // This is a SELECT, continue
143                        }
144                        _ => {
145                            debug!(
146                                "Bailing out from try_frontend_peek, because EXPLAIN is not for a SELECT query"
147                            );
148                            return Ok(None);
149                        }
150                    }
151                }
152                Statement::ExplainPushdown(explain_stmt) => {
153                    // Only handle EXPLAIN FILTER PUSHDOWN for non-BROKEN SELECT statements
154                    match &explain_stmt.explainee {
155                        mz_sql_parser::ast::Explainee::Select(_, false) => {}
156                        _ => {
157                            debug!(
158                                "Bailing out from try_frontend_peek, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is for EXPLAIN BROKEN"
159                            );
160                            return Ok(None);
161                        }
162                    }
163                }
164                Statement::Copy(copy_stmt) => {
165                    match &copy_stmt.direction {
166                        CopyDirection::To => {
167                            // Check for SUBSCRIBE inside COPY TO - we don't handle Plan::Subscribe
168                            if matches!(&copy_stmt.relation, CopyRelation::Subscribe(_)) {
169                                debug!(
170                                    "Bailing out from try_frontend_peek, because COPY (SUBSCRIBE ...) TO is not supported"
171                                );
172                                return Ok(None);
173                            }
174                            // This is COPY TO (SELECT), continue
175                        }
176                        CopyDirection::From => {
177                            debug!(
178                                "Bailing out from try_frontend_peek, because COPY FROM is not supported"
179                            );
180                            return Ok(None);
181                        }
182                    }
183                }
184                _ => {
185                    debug!(
186                        "Bailing out from try_frontend_peek, because statement type is not supported"
187                    );
188                    return Ok(None);
189                }
190            }
191        }
192
193        // Set up statement logging, and log the beginning of execution.
194        // (But only if we're not executing in the context of another statement.)
195        let statement_logging_id = if outer_ctx_extra.is_none() {
196            // This is a new statement, so begin statement logging
197            let result = self.statement_logging_frontend.begin_statement_execution(
198                session,
199                &params,
200                &logging,
201                catalog.system_config(),
202                lifecycle_timestamps,
203            );
204
205            if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result {
206                self.log_began_execution(began_execution, mseh_update, prepared_statement);
207                Some(logging_id)
208            } else {
209                None
210            }
211        } else {
212            // We're executing in the context of another statement (e.g., FETCH),
213            // so extract the statement logging ID from the outer context if present.
214            // We take ownership and retire the outer context here. The end of execution will be
215            // logged in one of the following ways:
216            // - At the end of this function, if the execution is finished by then.
217            // - Later by the Coordinator, either due to RegisterFrontendPeek or ExecuteSlowPathPeek.
218            outer_ctx_extra
219                .take()
220                .and_then(|guard| guard.defuse().retire())
221        };
222
223        let result = self
224            .try_frontend_peek_inner(session, catalog, stmt, params, statement_logging_id)
225            .await;
226
227        // Log the end of execution if we are logging this statement and execution has already
228        // ended.
229        if let Some(logging_id) = statement_logging_id {
230            let reason = match &result {
231                // Streaming results are handled asynchronously by the coordinator
232                Ok(Some(ExecuteResponse::SendingRowsStreaming { .. })) => {
233                    // Don't log here - the peek is still executing.
234                    // It will be logged when handle_peek_notification is called.
235                    return result;
236                }
237                // COPY TO needs to check its inner response
238                Ok(Some(resp @ ExecuteResponse::CopyTo { resp: inner, .. })) => {
239                    match inner.as_ref() {
240                        ExecuteResponse::SendingRowsStreaming { .. } => {
241                            // Don't log here - the peek is still executing.
242                            // It will be logged when handle_peek_notification is called.
243                            return result;
244                        }
245                        // For non-streaming COPY TO responses, use the outer CopyTo for conversion
246                        _ => resp.into(),
247                    }
248                }
249                // Bailout case, which should not happen
250                Ok(None) => {
251                    soft_panic_or_log!(
252                        "Bailed out from `try_frontend_peek_inner` after we already logged the beginning of statement execution."
253                    );
254                    // This statement will be handled by the old peek sequencing, which will do its
255                    // own statement logging from the beginning. So, let's close out this one.
256                    self.log_ended_execution(
257                        logging_id,
258                        StatementEndedExecutionReason::Errored {
259                            error: "Internal error: bailed out from `try_frontend_peek_inner`"
260                                .to_string(),
261                        },
262                    );
263                    return result;
264                }
265                // All other success responses - use the From implementation
266                // TODO(peek-seq): After we delete the old peek sequencing, we'll be able to adjust
267                // the From implementation to do exactly what we need in the frontend peek
268                // sequencing, so that the above special cases won't be needed.
269                Ok(Some(resp)) => resp.into(),
270                Err(e) => StatementEndedExecutionReason::Errored {
271                    error: e.to_string(),
272                },
273            };
274
275            self.log_ended_execution(logging_id, reason);
276        }
277
278        result
279    }
280
281    /// This is encapsulated in an inner function so that the outer function can still do statement
282    /// logging after the `?` returns of the inner function.
283    async fn try_frontend_peek_inner(
284        &mut self,
285        session: &mut Session,
286        catalog: Arc<Catalog>,
287        stmt: Option<Arc<Statement<Raw>>>,
288        params: Params,
289        statement_logging_id: Option<crate::statement_logging::StatementLoggingId>,
290    ) -> Result<Option<ExecuteResponse>, AdapterError> {
291        let stmt = match stmt {
292            Some(stmt) => stmt,
293            None => {
294                debug!("try_frontend_peek_inner succeeded on an empty query");
295                return Ok(Some(ExecuteResponse::EmptyQuery));
296            }
297        };
298
299        session
300            .metrics()
301            .query_total(&[
302                metrics::session_type_label_value(session.user()),
303                metrics::statement_type_label_value(&stmt),
304            ])
305            .inc();
306
307        // # From handle_execute_inner
308
309        let conn_catalog = catalog.for_session(session);
310        // (`resolved_ids` should be derivable from `stmt`. If `stmt` is later transformed to
311        // remove/add IDs, then `resolved_ids` should be updated to also remove/add those IDs.)
312        let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?;
313
314        let pcx = session.pcx();
315        let plan = mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, &params, &resolved_ids)?;
316
317        let (select_plan, explain_ctx, copy_to_ctx) = match &plan {
318            Plan::Select(select_plan) => {
319                let explain_ctx = if session.vars().emit_plan_insights_notice() {
320                    let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
321                    ExplainContext::PlanInsightsNotice(optimizer_trace)
322                } else {
323                    ExplainContext::None
324                };
325                (select_plan, explain_ctx, None)
326            }
327            Plan::ShowColumns(show_columns_plan) => {
328                // ShowColumns wraps a SelectPlan, extract it and proceed as normal.
329                (&show_columns_plan.select_plan, ExplainContext::None, None)
330            }
331            Plan::ExplainPlan(plan::ExplainPlanPlan {
332                stage,
333                format,
334                config,
335                explainee:
336                    plan::Explainee::Statement(plan::ExplaineeStatement::Select { broken, plan, desc }),
337            }) => {
338                // Create OptimizerTrace to collect optimizer plans
339                let optimizer_trace = OptimizerTrace::new(stage.paths());
340                let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
341                    broken: *broken,
342                    config: config.clone(),
343                    format: *format,
344                    stage: *stage,
345                    replan: None,
346                    desc: Some(desc.clone()),
347                    optimizer_trace,
348                });
349                (plan, explain_ctx, None)
350            }
351            // COPY TO S3
352            Plan::CopyTo(plan::CopyToPlan {
353                select_plan,
354                desc,
355                to,
356                connection,
357                connection_id,
358                format,
359                max_file_size,
360            }) => {
361                let uri = eval_copy_to_uri(to.clone(), session, catalog.state())?;
362
363                // (output_batch_count will be set later)
364                let copy_to_ctx = CopyToContext {
365                    desc: desc.clone(),
366                    uri,
367                    connection: connection.clone(),
368                    connection_id: *connection_id,
369                    format: format.clone(),
370                    max_file_size: *max_file_size,
371                    output_batch_count: None,
372                };
373
374                (select_plan, ExplainContext::None, Some(copy_to_ctx))
375            }
376            Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => {
377                // Only handle EXPLAIN FILTER PUSHDOWN for SELECT statements
378                match explainee {
379                    plan::Explainee::Statement(plan::ExplaineeStatement::Select {
380                        broken: false,
381                        plan,
382                        desc: _,
383                    }) => {
384                        let explain_ctx = ExplainContext::Pushdown;
385                        (plan, explain_ctx, None)
386                    }
387                    _ => {
388                        // This shouldn't happen because we already checked for this at the AST
389                        // level before calling `try_frontend_peek_inner`.
390                        soft_panic_or_log!(
391                            "unexpected EXPLAIN FILTER PUSHDOWN plan kind in frontend peek sequencing: {:?}",
392                            explainee
393                        );
394                        debug!(
395                            "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is EXPLAIN BROKEN"
396                        );
397                        return Ok(None);
398                    }
399                }
400            }
401            Plan::SideEffectingFunc(sef_plan) => {
402                // Side-effecting functions need Coordinator state (e.g., active_conns),
403                // so delegate to the Coordinator via a Command.
404                // The RBAC check is performed in the Coordinator where active_conns is available.
405                let response = self
406                    .call_coordinator(|tx| Command::ExecuteSideEffectingFunc {
407                        plan: sef_plan.clone(),
408                        conn_id: session.conn_id().clone(),
409                        current_role: session.role_metadata().current_role,
410                        tx,
411                    })
412                    .await?;
413                return Ok(Some(response));
414            }
415            _ => {
416                // This shouldn't happen because we already checked for this at the AST
417                // level before calling `try_frontend_peek_inner`.
418                soft_panic_or_log!(
419                    "Unexpected plan kind in frontend peek sequencing: {:?}",
420                    plan
421                );
422                debug!(
423                    "Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, side-effecting SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO S3"
424                );
425                return Ok(None);
426            }
427        };
428
429        // # From sequence_plan
430
431        // We have checked the plan kind above.
432        assert!(plan.allowed_in_read_only());
433
434        let target_cluster = match session.transaction().cluster() {
435            // Use the current transaction's cluster.
436            Some(cluster_id) => TargetCluster::Transaction(cluster_id),
437            // If there isn't a current cluster set for a transaction, then try to auto route.
438            None => {
439                coord::catalog_serving::auto_run_on_catalog_server(&conn_catalog, session, &plan)
440            }
441        };
442        let (cluster, target_cluster_id, target_cluster_name) = {
443            let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
444            (cluster, cluster.id, &cluster.name)
445        };
446
447        // Log cluster selection
448        if let Some(logging_id) = &statement_logging_id {
449            self.log_set_cluster(*logging_id, target_cluster_id);
450        }
451
452        coord::catalog_serving::check_cluster_restrictions(
453            target_cluster_name.as_str(),
454            &conn_catalog,
455            &plan,
456        )?;
457
458        rbac::check_plan(
459            &conn_catalog,
460            // We can't look at `active_conns` here, but that's ok, because this case was handled
461            // above already inside `Command::ExecuteSideEffectingFunc`.
462            None::<fn(u32) -> Option<RoleId>>,
463            session,
464            &plan,
465            Some(target_cluster_id),
466            &resolved_ids,
467        )?;
468
469        if let Some((_, wait_future)) =
470            coord::appends::waiting_on_startup_appends(&*catalog, session, &plan)
471        {
472            wait_future.await;
473        }
474
475        let max_query_result_size = Some(session.vars().max_query_result_size());
476
477        // # From sequence_peek
478
479        // # From peek_validate
480
481        let compute_instance_snapshot =
482            ComputeInstanceSnapshot::new_without_collections(cluster.id());
483
484        let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config())
485            .override_from(&catalog.get_cluster(cluster.id()).config.features())
486            .override_from(&explain_ctx);
487
488        if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
489            return Err(AdapterError::NoClusterReplicasAvailable {
490                name: cluster.name.clone(),
491                is_managed: cluster.is_managed(),
492            });
493        }
494
495        let (_, view_id) = self.transient_id_gen.allocate_id();
496        let (_, index_id) = self.transient_id_gen.allocate_id();
497
498        let mut optimizer = if let Some(mut copy_to_ctx) = copy_to_ctx {
499            // COPY TO path: calculate output_batch_count and create copy_to optimizer
500            let worker_counts = cluster.replicas().map(|r| {
501                let loc = &r.config.location;
502                loc.workers().unwrap_or_else(|| loc.num_processes())
503            });
504            let max_worker_count = match worker_counts.max() {
505                Some(count) => u64::cast_from(count),
506                None => {
507                    return Err(AdapterError::NoClusterReplicasAvailable {
508                        name: cluster.name.clone(),
509                        is_managed: cluster.is_managed(),
510                    });
511                }
512            };
513            copy_to_ctx.output_batch_count = Some(max_worker_count);
514
515            Either::Right(optimize::copy_to::Optimizer::new(
516                Arc::clone(&catalog),
517                compute_instance_snapshot.clone(),
518                view_id,
519                copy_to_ctx,
520                optimizer_config,
521                self.optimizer_metrics.clone(),
522            ))
523        } else {
524            // SELECT/EXPLAIN path: create peek optimizer
525            Either::Left(optimize::peek::Optimizer::new(
526                Arc::clone(&catalog),
527                compute_instance_snapshot.clone(),
528                select_plan.finishing.clone(),
529                view_id,
530                index_id,
531                optimizer_config,
532                self.optimizer_metrics.clone(),
533            ))
534        };
535
536        let target_replica_name = session.vars().cluster_replica();
537        let mut target_replica = target_replica_name
538            .map(|name| {
539                cluster
540                    .replica_id(name)
541                    .ok_or(AdapterError::UnknownClusterReplica {
542                        cluster_name: cluster.name.clone(),
543                        replica_name: name.to_string(),
544                    })
545            })
546            .transpose()?;
547
548        let source_ids = select_plan.source.depends_on();
549        // TODO(peek-seq): validate_timeline_context can be expensive in real scenarios (not in
550        // simple benchmarks), because it traverses transitive dependencies even of indexed views and
551        // materialized views (also traversing their MIR plans).
552        let mut timeline_context = catalog.validate_timeline_context(source_ids.iter().copied())?;
553        if matches!(timeline_context, TimelineContext::TimestampIndependent)
554            && select_plan.source.contains_temporal()?
555        {
556            // If the source IDs are timestamp independent but the query contains temporal functions,
557            // then the timeline context needs to be upgraded to timestamp dependent. This is
558            // required because `source_ids` doesn't contain functions.
559            timeline_context = TimelineContext::TimestampDependent;
560        }
561
562        let notices = coord::sequencer::check_log_reads(
563            &catalog,
564            cluster,
565            &source_ids,
566            &mut target_replica,
567            session.vars(),
568        )?;
569        session.add_notices(notices);
570
571        // # From peek_linearize_timestamp
572
573        let isolation_level = session.vars().transaction_isolation().clone();
574        let timeline = Coordinator::get_timeline(&timeline_context);
575        let needs_linearized_read_ts =
576            Coordinator::needs_linearized_read_ts(&isolation_level, &select_plan.when);
577
578        let oracle_read_ts = match timeline {
579            Some(timeline) if needs_linearized_read_ts => {
580                let oracle = self.ensure_oracle(timeline).await?;
581                let oracle_read_ts = oracle.read_ts().await;
582                Some(oracle_read_ts)
583            }
584            Some(_) | None => None,
585        };
586
587        // # From peek_real_time_recency
588
589        let vars = session.vars();
590        let real_time_recency_ts: Option<Timestamp> = if vars.real_time_recency()
591            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
592            && !session.contains_read_timestamp()
593        {
594            // Only call the coordinator when we actually need real-time recency
595            self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp {
596                source_ids: source_ids.clone(),
597                real_time_recency_timeout: *vars.real_time_recency_timeout(),
598                tx,
599            })
600            .await?
601        } else {
602            None
603        };
604
605        // # From peek_timestamp_read_hold
606
607        let dataflow_builder = DataflowBuilder::new(catalog.state(), compute_instance_snapshot);
608        let input_id_bundle = dataflow_builder.sufficient_collections(source_ids.clone());
609
610        // ## From sequence_peek_timestamp
611
612        // Warning: This will be false for AS OF queries, even if we are otherwise inside a
613        // multi-statement transaction. (It's also false for FreshestTableWrite, which is currently
614        // only read-then-write queries, which can't be part of multi-statement transactions, so
615        // FreshestTableWrite doesn't matter.)
616        //
617        // TODO(peek-seq): It's not totally clear to me what the intended semantics are for AS OF
618        // queries inside a transaction: We clearly can't use the transaction timestamp, but the old
619        // peek sequencing still does a timedomain validation. The new peek sequencing does not do
620        // timedomain validation for AS OF queries, which seems more natural. But I'm thinking that
621        // it would be the cleanest to just simply disallow AS OF queries inside transactions.
622        let in_immediate_multi_stmt_txn = session
623            .transaction()
624            .in_immediate_multi_stmt_txn(&select_plan.when);
625
626        // Fetch or generate a timestamp for this query and fetch or acquire read holds.
627        let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
628            // Use the transaction's timestamp if it exists and this isn't an AS OF query.
629            // (`in_immediate_multi_stmt_txn` is false for AS OF queries.)
630            Some(
631                determination @ TimestampDetermination {
632                    timestamp_context: TimestampContext::TimelineTimestamp { .. },
633                    ..
634                },
635            ) if in_immediate_multi_stmt_txn => {
636                // This is a subsequent (non-AS OF, non-constant) query in a multi-statement
637                // transaction. We now:
638                // - Validate that the query only accesses collections within the transaction's
639                //   timedomain (which we know from the stored read holds).
640                // - Use the transaction's stored timestamp determination.
641                // - Use the (relevant subset of the) transaction's read holds.
642
643                let txn_read_holds_opt = self
644                    .call_coordinator(|tx| Command::GetTransactionReadHoldsBundle {
645                        conn_id: session.conn_id().clone(),
646                        tx,
647                    })
648                    .await;
649
650                if let Some(txn_read_holds) = txn_read_holds_opt {
651                    let allowed_id_bundle = txn_read_holds.id_bundle();
652                    let outside = input_id_bundle.difference(&allowed_id_bundle);
653
654                    // Queries without a timestamp and timeline can belong to any existing timedomain.
655                    if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
656                        let valid_names =
657                            allowed_id_bundle.resolve_names(&*catalog, session.conn_id());
658                        let invalid_names = outside.resolve_names(&*catalog, session.conn_id());
659                        return Err(AdapterError::RelationOutsideTimeDomain {
660                            relations: invalid_names,
661                            names: valid_names,
662                        });
663                    }
664
665                    // Extract the subset of read holds for the collections this query accesses.
666                    let read_holds = txn_read_holds.subset(&input_id_bundle);
667
668                    (determination, read_holds)
669                } else {
670                    // This should never happen: we're in a subsequent query of a multi-statement
671                    // transaction (we have a transaction timestamp), but the coordinator has no
672                    // transaction read holds stored. This indicates a bug in the transaction
673                    // handling.
674                    return Err(AdapterError::Internal(
675                        "Missing transaction read holds for multi-statement transaction"
676                            .to_string(),
677                    ));
678                }
679            }
680            _ => {
681                // There is no timestamp determination yet for this transaction. Either:
682                // - We are not in a multi-statement transaction.
683                // - This is the first (non-AS OF) query in a multi-statement transaction.
684                // - This is an AS OF query.
685                // - This is a constant query (`TimestampContext::NoTimestamp`).
686
687                let timedomain_bundle;
688                let determine_bundle = if in_immediate_multi_stmt_txn {
689                    // This is the first (non-AS OF) query in a multi-statement transaction.
690                    // Determine a timestamp that will be valid for anything in any schema
691                    // referenced by the first query.
692                    timedomain_bundle = timedomain_for(
693                        &*catalog,
694                        &dataflow_builder,
695                        &source_ids,
696                        &timeline_context,
697                        session.conn_id(),
698                        target_cluster_id,
699                    )?;
700                    &timedomain_bundle
701                } else {
702                    // Simply use the inputs of the current query.
703                    &input_id_bundle
704                };
705                let (determination, read_holds) = self
706                    .frontend_determine_timestamp(
707                        catalog.state(),
708                        session,
709                        determine_bundle,
710                        &select_plan.when,
711                        target_cluster_id,
712                        &timeline_context,
713                        oracle_read_ts,
714                        real_time_recency_ts,
715                    )
716                    .await?;
717
718                // If this is the first (non-AS OF) query in a multi-statement transaction, store
719                // the read holds in the coordinator, so subsequent queries can validate against
720                // them.
721                if in_immediate_multi_stmt_txn {
722                    self.call_coordinator(|tx| Command::StoreTransactionReadHolds {
723                        conn_id: session.conn_id().clone(),
724                        read_holds: read_holds.clone(),
725                        tx,
726                    })
727                    .await;
728                }
729
730                (determination, read_holds)
731            }
732        };
733
734        {
735            // Assert that we have a read hold for all the collections in our `input_id_bundle`.
736            for id in input_id_bundle.iter() {
737                let s = read_holds.storage_holds.contains_key(&id);
738                let c = read_holds
739                    .compute_ids()
740                    .map(|(_instance, coll)| coll)
741                    .contains(&id);
742                soft_assert_or_log!(
743                    s || c,
744                    "missing read hold for collection {} in `input_id_bundle",
745                    id
746                );
747            }
748
749            // Assert that each part of the `input_id_bundle` corresponds to the right part of
750            // `read_holds`.
751            for id in input_id_bundle.storage_ids.iter() {
752                soft_assert_or_log!(
753                    read_holds.storage_holds.contains_key(id),
754                    "missing storage read hold for collection {} in `input_id_bundle",
755                    id
756                );
757            }
758            for id in input_id_bundle
759                .compute_ids
760                .iter()
761                .flat_map(|(_instance, colls)| colls)
762            {
763                soft_assert_or_log!(
764                    read_holds
765                        .compute_ids()
766                        .map(|(_instance, coll)| coll)
767                        .contains(id),
768                    "missing compute read hold for collection {} in `input_id_bundle",
769                    id,
770                );
771            }
772        }
773
774        // (TODO(peek-seq): The below TODO is copied from the old peek sequencing. We should resolve
775        // this when we decide what to with `AS OF` in transactions.)
776        // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
777        // arbitrary and we don't recall why we did it (possibly an error!). Change this to always
778        // set the transaction ops. Decide and document what our policy should be on AS OF queries.
779        // Maybe they shouldn't be allowed in transactions at all because it's hard to explain
780        // what's going on there. This should probably get a small design document.
781
782        // We only track the peeks in the session if the query doesn't use AS
783        // OF or we're inside an explicit transaction. The latter case is
784        // necessary to support PG's `BEGIN` semantics, whose behavior can
785        // depend on whether or not reads have occurred in the txn.
786        let requires_linearization = (&explain_ctx).into();
787        let mut transaction_determination = determination.clone();
788        if select_plan.when.is_transactional() {
789            session.add_transaction_ops(TransactionOps::Peeks {
790                determination: transaction_determination,
791                cluster_id: target_cluster_id,
792                requires_linearization,
793            })?;
794        } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
795            // If the query uses AS OF, then ignore the timestamp.
796            transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
797            session.add_transaction_ops(TransactionOps::Peeks {
798                determination: transaction_determination,
799                cluster_id: target_cluster_id,
800                requires_linearization,
801            })?;
802        };
803
804        // # From peek_optimize
805
806        let stats = statistics_oracle(
807            session,
808            &source_ids,
809            &determination.timestamp_context.antichain(),
810            true,
811            catalog.system_config(),
812            &*self.storage_collections,
813        )
814        .await
815        .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
816
817        // Generate data structures that can be moved to another task where we will perform possibly
818        // expensive optimizations.
819        let timestamp_context = determination.timestamp_context.clone();
820        let session_meta = session.meta();
821        let now = catalog.config().now.clone();
822        let select_plan = select_plan.clone();
823        let target_cluster_name = target_cluster_name.clone();
824        let needs_plan_insights = explain_ctx.needs_plan_insights();
825        let determination_for_pushdown = if matches!(explain_ctx, ExplainContext::Pushdown) {
826            // This is a hairy data structure, so avoid this clone if we are not in
827            // EXPLAIN FILTER PUSHDOWN.
828            Some(determination.clone())
829        } else {
830            None
831        };
832
833        let span = Span::current();
834
835        // Prepare data for plan insights if needed
836        let catalog_for_insights = if needs_plan_insights {
837            Some(Arc::clone(&catalog))
838        } else {
839            None
840        };
841        let mut compute_instances = BTreeMap::new();
842        if needs_plan_insights {
843            for user_cluster in catalog.user_clusters() {
844                let snapshot = ComputeInstanceSnapshot::new_without_collections(user_cluster.id);
845                compute_instances.insert(user_cluster.name.clone(), snapshot);
846            }
847        }
848
849        let source_ids_for_closure = source_ids.clone();
850        let optimization_future = mz_ore::task::spawn_blocking(
851            || "optimize peek",
852            move || {
853                span.in_scope(|| {
854                    let _dispatch_guard = explain_ctx.dispatch_guard();
855
856                    let raw_expr = select_plan.source.clone();
857
858                    // The purpose of wrapping the following in a closure is to control where the
859                    // `?`s return from, so that even when a `catch_unwind_optimize` call fails,
860                    // we can still handle `EXPLAIN BROKEN`.
861                    let pipeline = || -> Result<Either<optimize::peek::GlobalLirPlan, optimize::copy_to::GlobalLirPlan>, OptimizerError> {
862                        match optimizer.as_mut() {
863                            Either::Left(optimizer) => {
864                                // SELECT/EXPLAIN path
865                                // HIR ⇒ MIR lowering and MIR optimization (local)
866                                let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr.clone())?;
867                                // Attach resolved context required to continue the pipeline.
868                                let local_mir_plan =
869                                    local_mir_plan.resolve(timestamp_context.clone(), &session_meta, stats);
870                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
871                                let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
872                                Ok(Either::Left(global_lir_plan))
873                            }
874                            Either::Right(optimizer) => {
875                                // COPY TO path
876                                // HIR ⇒ MIR lowering and MIR optimization (local)
877                                let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr.clone())?;
878                                // Attach resolved context required to continue the pipeline.
879                                let local_mir_plan =
880                                    local_mir_plan.resolve(timestamp_context.clone(), &session_meta, stats);
881                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
882                                let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
883                                Ok(Either::Right(global_lir_plan))
884                            }
885                        }
886                    };
887
888                    let global_lir_plan_result = pipeline();
889                    let optimization_finished_at = now();
890
891                    let create_insights_ctx = |optimizer: &optimize::peek::Optimizer, is_notice: bool| -> Option<Box<PlanInsightsContext>> {
892                        if !needs_plan_insights {
893                            return None;
894                        }
895
896                        let catalog = catalog_for_insights.as_ref()?;
897
898                        let enable_re_optimize = if needs_plan_insights {
899                            // Disable any plan insights that use the optimizer if we only want the
900                            // notice and plan optimization took longer than the threshold. This is
901                            // to prevent a situation where optimizing takes a while and there are
902                            // lots of clusters, which would delay peek execution by the product of
903                            // those.
904                            //
905                            // (This heuristic doesn't work well, see #9492.)
906                            let opt_limit = mz_adapter_types::dyncfgs::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
907                                .get(catalog.system_config().dyncfgs());
908                            !(is_notice && optimizer.duration() > opt_limit)
909                        } else {
910                            false
911                        };
912
913                        Some(Box::new(PlanInsightsContext {
914                            stmt: select_plan.select.as_deref().map(Clone::clone).map(Statement::Select),
915                            raw_expr: raw_expr.clone(),
916                            catalog: Arc::clone(catalog),
917                            compute_instances,
918                            target_instance: target_cluster_name,
919                            metrics: optimizer.metrics().clone(),
920                            finishing: optimizer.finishing().clone(),
921                            optimizer_config: optimizer.config().clone(),
922                            session: session_meta,
923                            timestamp_context,
924                            view_id: optimizer.select_id(),
925                            index_id: optimizer.index_id(),
926                            enable_re_optimize,
927                        }))
928                    };
929
930                    match global_lir_plan_result {
931                        Ok(Either::Left(global_lir_plan)) => {
932                            // SELECT/EXPLAIN path
933                            let optimizer = optimizer.unwrap_left();
934                            match explain_ctx {
935                                ExplainContext::Plan(explain_ctx) => {
936                                    let (_, df_meta, _) = global_lir_plan.unapply();
937                                    let insights_ctx = create_insights_ctx(&optimizer, false);
938                                    Ok(Execution::ExplainPlan {
939                                        df_meta,
940                                        explain_ctx,
941                                        optimizer,
942                                        insights_ctx,
943                                    })
944                                }
945                                ExplainContext::None => {
946                                    Ok(Execution::Peek {
947                                        global_lir_plan,
948                                        optimization_finished_at,
949                                        plan_insights_optimizer_trace: None,
950                                        insights_ctx: None,
951                                    })
952                                }
953                                ExplainContext::PlanInsightsNotice(optimizer_trace) => {
954                                    let insights_ctx = create_insights_ctx(&optimizer, true);
955                                    Ok(Execution::Peek {
956                                        global_lir_plan,
957                                        optimization_finished_at,
958                                        plan_insights_optimizer_trace: Some(optimizer_trace),
959                                        insights_ctx,
960                                    })
961                                }
962                                ExplainContext::Pushdown => {
963                                    let (plan, _, _) = global_lir_plan.unapply();
964                                    let imports = match plan {
965                                        PeekPlan::SlowPath(plan) => plan
966                                            .desc
967                                            .source_imports
968                                            .into_iter()
969                                            .filter_map(|(id, import)| import.desc.arguments.operators.map(|mfp| (id, mfp)))
970                                            .collect(),
971                                        PeekPlan::FastPath(_) => std::collections::BTreeMap::default(),
972                                    };
973                                    Ok(Execution::ExplainPushdown {
974                                        imports,
975                                        determination: determination_for_pushdown.expect("it's present for the ExplainPushdown case"),
976                                    })
977                                }
978                            }
979                        }
980                        Ok(Either::Right(global_lir_plan)) => {
981                            // COPY TO S3 path
982                            Ok(Execution::CopyToS3 {
983                                global_lir_plan,
984                                source_ids: source_ids_for_closure,
985                            })
986                        }
987                        Err(err) => {
988                            if optimizer.is_right() {
989                                // COPY TO has no EXPLAIN BROKEN support
990                                return Err(err);
991                            }
992                            // SELECT/EXPLAIN error handling
993                            let optimizer = optimizer.expect_left("checked above");
994                            if let ExplainContext::Plan(explain_ctx) = explain_ctx {
995                                if explain_ctx.broken {
996                                    // EXPLAIN BROKEN: log error and continue with defaults
997                                    tracing::error!("error while handling EXPLAIN statement: {}", err);
998                                    Ok(Execution::ExplainPlan {
999                                        df_meta: Default::default(),
1000                                        explain_ctx,
1001                                        optimizer,
1002                                        insights_ctx: None,
1003                                    })
1004                                } else {
1005                                    Err(err)
1006                                }
1007                            } else {
1008                                Err(err)
1009                            }
1010                        }
1011                    }
1012                })
1013            },
1014        );
1015        let optimization_timeout = *session.vars().statement_timeout();
1016        let optimization_result =
1017            // Note: spawn_blocking tasks cannot be cancelled, so on timeout we stop waiting but the
1018            // optimization task continues running in the background until completion. See
1019            // https://github.com/MaterializeInc/database-issues/issues/8644 for properly cancelling
1020            // optimizer runs.
1021            match tokio::time::timeout(optimization_timeout, optimization_future).await {
1022                Ok(Ok(result)) => result,
1023                Ok(Err(optimizer_error)) => {
1024                    return Err(AdapterError::Internal(format!(
1025                        "internal error in optimizer: {}",
1026                        optimizer_error
1027                    )));
1028                }
1029                Err(_elapsed) => {
1030                    warn!("optimize peek timed out after {:?}", optimization_timeout);
1031                    return Err(AdapterError::StatementTimeout);
1032                }
1033            };
1034
1035        // Log optimization finished
1036        if let Some(logging_id) = &statement_logging_id {
1037            self.log_lifecycle_event(*logging_id, StatementLifecycleEvent::OptimizationFinished);
1038        }
1039
1040        // Assert that read holds are correct for the execution plan
1041        Self::assert_read_holds_correct(
1042            &read_holds,
1043            &optimization_result,
1044            &determination,
1045            target_cluster_id,
1046        );
1047
1048        // Handle the optimization result: either generate EXPLAIN output or continue with execution
1049        match optimization_result {
1050            Execution::ExplainPlan {
1051                df_meta,
1052                explain_ctx,
1053                optimizer,
1054                insights_ctx,
1055            } => {
1056                let rows = coord::sequencer::explain_plan_inner(
1057                    session,
1058                    &catalog,
1059                    df_meta,
1060                    explain_ctx,
1061                    optimizer,
1062                    insights_ctx,
1063                )
1064                .await?;
1065
1066                Ok(Some(ExecuteResponse::SendingRowsImmediate {
1067                    rows: Box::new(rows.into_row_iter()),
1068                }))
1069            }
1070            Execution::ExplainPushdown {
1071                imports,
1072                determination,
1073            } => {
1074                // # From peek_explain_pushdown
1075
1076                let as_of = determination.timestamp_context.antichain();
1077                let mz_now = determination
1078                    .timestamp_context
1079                    .timestamp()
1080                    .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1081                    .unwrap_or_else(ResultSpec::value_all);
1082
1083                Ok(Some(
1084                    coord::sequencer::explain_pushdown_future_inner(
1085                        session,
1086                        &*catalog,
1087                        &self.storage_collections,
1088                        as_of,
1089                        mz_now,
1090                        imports,
1091                    )
1092                    .await
1093                    .await?,
1094                ))
1095            }
1096            Execution::Peek {
1097                global_lir_plan,
1098                optimization_finished_at: _optimization_finished_at,
1099                plan_insights_optimizer_trace,
1100                insights_ctx,
1101            } => {
1102                // Continue with normal execution
1103                // # From peek_finish
1104
1105                // The typ here was generated from the HIR SQL type and simply stored in LIR.
1106                let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
1107
1108                coord::sequencer::emit_optimizer_notices(
1109                    &*catalog,
1110                    session,
1111                    &df_meta.optimizer_notices,
1112                );
1113
1114                // Generate plan insights notice if needed
1115                if let Some(trace) = plan_insights_optimizer_trace {
1116                    let target_cluster = catalog.get_cluster(target_cluster_id);
1117                    let features = OptimizerFeatures::from(catalog.system_config())
1118                        .override_from(&target_cluster.config.features());
1119                    let insights = trace
1120                        .into_plan_insights(
1121                            &features,
1122                            &catalog.for_session(session),
1123                            Some(select_plan.finishing.clone()),
1124                            Some(target_cluster),
1125                            df_meta.clone(),
1126                            insights_ctx,
1127                        )
1128                        .await?;
1129                    session.add_notice(AdapterNotice::PlanInsights(insights));
1130                }
1131
1132                // # Now back to peek_finish
1133
1134                let watch_set = statement_logging_id.map(|logging_id| {
1135                    WatchSetCreation::new(
1136                        logging_id,
1137                        catalog.state(),
1138                        &input_id_bundle,
1139                        determination.timestamp_context.timestamp_or_default(),
1140                    )
1141                });
1142
1143                let max_result_size = catalog.system_config().max_result_size();
1144
1145                // Clone determination if we need it for emit_timestamp_notice, since it may be
1146                // moved into Command::ExecuteSlowPathPeek.
1147                let determination_for_notice = if session.vars().emit_timestamp_notice() {
1148                    Some(determination.clone())
1149                } else {
1150                    None
1151                };
1152
1153                let response = match peek_plan {
1154                    PeekPlan::FastPath(fast_path_plan) => {
1155                        if let Some(logging_id) = &statement_logging_id {
1156                            // TODO(peek-seq): Actually, we should log it also for
1157                            // FastPathPlan::Constant. The only reason we are not doing so at the
1158                            // moment is to match the old peek sequencing, so that statement logging
1159                            // tests pass with the frontend peek sequencing turned both on and off.
1160                            //
1161                            // When the old sequencing is removed, we should make a couple of
1162                            // changes in how we log timestamps:
1163                            // - Move this up to just after timestamp determination, so that it
1164                            //   appears in the log as soon as possible.
1165                            // - Do it also for Constant peeks.
1166                            // - Currently, slow-path peeks' timestamp logging is done by
1167                            //   `implement_peek_plan`. We could remove it from there, and just do
1168                            //   it here.
1169                            if !matches!(fast_path_plan, FastPathPlan::Constant(..)) {
1170                                self.log_set_timestamp(
1171                                    *logging_id,
1172                                    determination.timestamp_context.timestamp_or_default(),
1173                                );
1174                            }
1175                        }
1176
1177                        let row_set_finishing_seconds =
1178                            session.metrics().row_set_finishing_seconds().clone();
1179
1180                        let peek_stash_read_batch_size_bytes =
1181                            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
1182                                .get(catalog.system_config().dyncfgs());
1183                        let peek_stash_read_memory_budget_bytes =
1184                            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
1185                                .get(catalog.system_config().dyncfgs());
1186
1187                        self.implement_fast_path_peek_plan(
1188                            fast_path_plan,
1189                            determination.timestamp_context.timestamp_or_default(),
1190                            select_plan.finishing,
1191                            target_cluster_id,
1192                            target_replica,
1193                            typ,
1194                            max_result_size,
1195                            max_query_result_size,
1196                            row_set_finishing_seconds,
1197                            read_holds,
1198                            peek_stash_read_batch_size_bytes,
1199                            peek_stash_read_memory_budget_bytes,
1200                            session.conn_id().clone(),
1201                            source_ids,
1202                            watch_set,
1203                        )
1204                        .await?
1205                    }
1206                    PeekPlan::SlowPath(dataflow_plan) => {
1207                        if let Some(logging_id) = &statement_logging_id {
1208                            self.log_set_transient_index_id(*logging_id, dataflow_plan.id);
1209                        }
1210
1211                        self.call_coordinator(|tx| Command::ExecuteSlowPathPeek {
1212                            dataflow_plan: Box::new(dataflow_plan),
1213                            determination,
1214                            finishing: select_plan.finishing,
1215                            compute_instance: target_cluster_id,
1216                            target_replica,
1217                            intermediate_result_type: typ,
1218                            source_ids,
1219                            conn_id: session.conn_id().clone(),
1220                            max_result_size,
1221                            max_query_result_size,
1222                            watch_set,
1223                            tx,
1224                        })
1225                        .await?
1226                    }
1227                };
1228
1229                // Add timestamp notice if emit_timestamp_notice is enabled
1230                if let Some(determination) = determination_for_notice {
1231                    let explanation = self
1232                        .call_coordinator(|tx| Command::ExplainTimestamp {
1233                            conn_id: session.conn_id().clone(),
1234                            session_wall_time: session.pcx().wall_time,
1235                            cluster_id: target_cluster_id,
1236                            id_bundle: input_id_bundle.clone(),
1237                            determination,
1238                            tx,
1239                        })
1240                        .await;
1241                    session.add_notice(AdapterNotice::QueryTimestamp { explanation });
1242                }
1243
1244                Ok(Some(match select_plan.copy_to {
1245                    None => response,
1246                    // COPY TO STDOUT
1247                    Some(format) => ExecuteResponse::CopyTo {
1248                        format,
1249                        resp: Box::new(response),
1250                    },
1251                }))
1252            }
1253            Execution::CopyToS3 {
1254                global_lir_plan,
1255                source_ids,
1256            } => {
1257                let (df_desc, df_meta) = global_lir_plan.unapply();
1258
1259                coord::sequencer::emit_optimizer_notices(
1260                    &*catalog,
1261                    session,
1262                    &df_meta.optimizer_notices,
1263                );
1264
1265                // Extract S3 sink connection info for preflight check
1266                let sink_id = df_desc.sink_id();
1267                let sinks = &df_desc.sink_exports;
1268                if sinks.len() != 1 {
1269                    return Err(AdapterError::Internal(
1270                        "expected exactly one copy to s3 sink".into(),
1271                    ));
1272                }
1273                let (_, sink_desc) = sinks
1274                    .first_key_value()
1275                    .expect("known to be exactly one copy to s3 sink");
1276                let s3_sink_connection = match &sink_desc.connection {
1277                    mz_compute_types::sinks::ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1278                        conn.clone()
1279                    }
1280                    _ => {
1281                        return Err(AdapterError::Internal(
1282                            "expected copy to s3 oneshot sink".into(),
1283                        ));
1284                    }
1285                };
1286
1287                // Perform S3 preflight check in background task (via coordinator).
1288                // This runs slow S3 operations without blocking the coordinator's main task.
1289                self.call_coordinator(|tx| Command::CopyToPreflight {
1290                    s3_sink_connection,
1291                    sink_id,
1292                    tx,
1293                })
1294                .await?;
1295
1296                // Preflight succeeded, now execute the actual COPY TO dataflow
1297                let watch_set = statement_logging_id.map(|logging_id| {
1298                    WatchSetCreation::new(
1299                        logging_id,
1300                        catalog.state(),
1301                        &input_id_bundle,
1302                        determination.timestamp_context.timestamp_or_default(),
1303                    )
1304                });
1305
1306                let response = self
1307                    .call_coordinator(|tx| Command::ExecuteCopyTo {
1308                        df_desc: Box::new(df_desc),
1309                        compute_instance: target_cluster_id,
1310                        target_replica,
1311                        source_ids,
1312                        conn_id: session.conn_id().clone(),
1313                        watch_set,
1314                        tx,
1315                    })
1316                    .await?;
1317
1318                Ok(Some(response))
1319            }
1320        }
1321    }
1322
1323    /// (Similar to Coordinator::determine_timestamp)
1324    /// Determines the timestamp for a query, acquires read holds that ensure the
1325    /// query remains executable at that time, and returns those.
1326    /// The caller is responsible for eventually dropping those read holds.
1327    ///
1328    /// Note: self is taken &mut because of the lazy fetching in `get_compute_instance_client`.
1329    pub(crate) async fn frontend_determine_timestamp(
1330        &mut self,
1331        catalog_state: &CatalogState,
1332        session: &Session,
1333        id_bundle: &CollectionIdBundle,
1334        when: &QueryWhen,
1335        compute_instance: ComputeInstanceId,
1336        timeline_context: &TimelineContext,
1337        oracle_read_ts: Option<Timestamp>,
1338        real_time_recency_ts: Option<Timestamp>,
1339    ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
1340        // this is copy-pasted from Coordinator
1341
1342        let constraint_based = ConstraintBasedTimestampSelection::from_str(
1343            &CONSTRAINT_BASED_TIMESTAMP_SELECTION.get(catalog_state.system_config().dyncfgs()),
1344        );
1345
1346        let isolation_level = session.vars().transaction_isolation();
1347
1348        let (read_holds, upper) = self
1349            .acquire_read_holds_and_least_valid_write(id_bundle)
1350            .await
1351            .map_err(|err| {
1352                AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1353                    err,
1354                    compute_instance,
1355                )
1356            })?;
1357        let (det, read_holds) = <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1358            session,
1359            id_bundle,
1360            when,
1361            compute_instance,
1362            timeline_context,
1363            oracle_read_ts,
1364            real_time_recency_ts,
1365            isolation_level,
1366            &constraint_based,
1367            read_holds,
1368            upper.clone(),
1369        )?;
1370
1371        session
1372            .metrics()
1373            .determine_timestamp(&[
1374                match det.respond_immediately() {
1375                    true => "true",
1376                    false => "false",
1377                },
1378                isolation_level.as_str(),
1379                &compute_instance.to_string(),
1380                constraint_based.as_str(),
1381            ])
1382            .inc();
1383        if !det.respond_immediately()
1384            && isolation_level == &IsolationLevel::StrictSerializable
1385            && real_time_recency_ts.is_none()
1386        {
1387            // Note down the difference between StrictSerializable and Serializable into a metric.
1388            if let Some(strict) = det.timestamp_context.timestamp() {
1389                let (serializable_det, _tmp_read_holds) =
1390                    <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1391                        session,
1392                        id_bundle,
1393                        when,
1394                        compute_instance,
1395                        timeline_context,
1396                        oracle_read_ts,
1397                        real_time_recency_ts,
1398                        isolation_level,
1399                        &constraint_based,
1400                        read_holds.clone(),
1401                        upper,
1402                    )?;
1403                if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
1404                    session
1405                        .metrics()
1406                        .timestamp_difference_for_strict_serializable_ms(&[
1407                            compute_instance.to_string().as_ref(),
1408                            constraint_based.as_str(),
1409                        ])
1410                        .observe(f64::cast_lossy(u64::from(
1411                            strict.saturating_sub(*serializable),
1412                        )));
1413                }
1414            }
1415        }
1416
1417        Ok((det, read_holds))
1418    }
1419
1420    fn assert_read_holds_correct(
1421        read_holds: &ReadHolds<Timestamp>,
1422        execution: &Execution,
1423        determination: &TimestampDetermination<Timestamp>,
1424        target_cluster_id: ClusterId,
1425    ) {
1426        // Extract source_imports, index_imports, as_of, and execution_name based on Execution variant
1427        let (source_imports, index_imports, as_of, execution_name): (
1428            Vec<GlobalId>,
1429            Vec<GlobalId>,
1430            Timestamp,
1431            &str,
1432        ) = match execution {
1433            Execution::Peek {
1434                global_lir_plan, ..
1435            } => match global_lir_plan.peek_plan() {
1436                PeekPlan::FastPath(fast_path_plan) => {
1437                    let (sources, indexes) = match fast_path_plan {
1438                        FastPathPlan::Constant(..) => (vec![], vec![]),
1439                        FastPathPlan::PeekExisting(_coll_id, idx_id, ..) => (vec![], vec![*idx_id]),
1440                        FastPathPlan::PeekPersist(global_id, ..) => (vec![*global_id], vec![]),
1441                    };
1442                    (
1443                        sources,
1444                        indexes,
1445                        determination.timestamp_context.timestamp_or_default(),
1446                        "FastPath",
1447                    )
1448                }
1449                PeekPlan::SlowPath(dataflow_plan) => {
1450                    let as_of = dataflow_plan
1451                        .desc
1452                        .as_of
1453                        .clone()
1454                        .expect("dataflow has an as_of")
1455                        .into_element();
1456                    (
1457                        dataflow_plan.desc.source_imports.keys().cloned().collect(),
1458                        dataflow_plan.desc.index_imports.keys().cloned().collect(),
1459                        as_of,
1460                        "SlowPath",
1461                    )
1462                }
1463            },
1464            Execution::CopyToS3 {
1465                global_lir_plan, ..
1466            } => {
1467                let df_desc = global_lir_plan.df_desc();
1468                let as_of = df_desc
1469                    .as_of
1470                    .clone()
1471                    .expect("dataflow has an as_of")
1472                    .into_element();
1473                (
1474                    df_desc.source_imports.keys().cloned().collect(),
1475                    df_desc.index_imports.keys().cloned().collect(),
1476                    as_of,
1477                    "CopyToS3",
1478                )
1479            }
1480            Execution::ExplainPlan { .. } | Execution::ExplainPushdown { .. } => {
1481                // No read holds assertions needed for EXPLAIN variants
1482                return;
1483            }
1484        };
1485
1486        // Assert that we have some read holds for all the imports of the dataflow.
1487        for id in source_imports.iter() {
1488            soft_assert_or_log!(
1489                read_holds.storage_holds.contains_key(id),
1490                "[{}] missing read hold for the source import {}",
1491                execution_name,
1492                id
1493            );
1494        }
1495        for id in index_imports.iter() {
1496            soft_assert_or_log!(
1497                read_holds
1498                    .compute_ids()
1499                    .map(|(_instance, coll)| coll)
1500                    .contains(id),
1501                "[{}] missing read hold for the index import {}",
1502                execution_name,
1503                id,
1504            );
1505        }
1506
1507        // Also check the holds against the as_of.
1508        for (id, h) in read_holds.storage_holds.iter() {
1509            soft_assert_or_log!(
1510                h.since().less_equal(&as_of),
1511                "[{}] storage read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}",
1512                execution_name,
1513                h.since(),
1514                id,
1515                as_of,
1516                determination
1517            );
1518        }
1519        for ((instance, id), h) in read_holds.compute_holds.iter() {
1520            soft_assert_eq_or_log!(
1521                *instance,
1522                target_cluster_id,
1523                "[{}] the read hold on {} is on the wrong cluster",
1524                execution_name,
1525                id
1526            );
1527            soft_assert_or_log!(
1528                h.since().less_equal(&as_of),
1529                "[{}] compute read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}",
1530                execution_name,
1531                h.since(),
1532                id,
1533                as_of,
1534                determination
1535            );
1536        }
1537    }
1538}
1539
1540/// Enum for branching among various execution steps after optimization
1541enum Execution {
1542    Peek {
1543        global_lir_plan: optimize::peek::GlobalLirPlan,
1544        optimization_finished_at: EpochMillis,
1545        plan_insights_optimizer_trace: Option<OptimizerTrace>,
1546        insights_ctx: Option<Box<PlanInsightsContext>>,
1547    },
1548    CopyToS3 {
1549        global_lir_plan: optimize::copy_to::GlobalLirPlan,
1550        source_ids: BTreeSet<GlobalId>,
1551    },
1552    ExplainPlan {
1553        df_meta: DataflowMetainfo,
1554        explain_ctx: ExplainPlanContext,
1555        optimizer: optimize::peek::Optimizer,
1556        insights_ctx: Option<Box<PlanInsightsContext>>,
1557    },
1558    ExplainPushdown {
1559        imports: BTreeMap<GlobalId, mz_expr::MapFilterProject>,
1560        determination: TimestampDetermination<Timestamp>,
1561    },
1562}