mz_adapter/
frontend_peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::sync::Arc;
13
14use itertools::{Either, Itertools};
15use mz_adapter_types::dyncfgs::CONSTRAINT_BASED_TIMESTAMP_SELECTION;
16use mz_adapter_types::timestamp_selection::ConstraintBasedTimestampSelection;
17use mz_compute_types::ComputeInstanceId;
18use mz_expr::{CollectionPlan, ResultSpec};
19use mz_ore::cast::{CastFrom, CastLossy};
20use mz_ore::collections::CollectionExt;
21use mz_ore::now::EpochMillis;
22use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
23use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
24use mz_repr::role_id::RoleId;
25use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
26use mz_sql::ast::Raw;
27use mz_sql::catalog::CatalogCluster;
28use mz_sql::plan::Params;
29use mz_sql::plan::{self, Plan, QueryWhen};
30use mz_sql::rbac;
31use mz_sql::session::metadata::SessionMetadata;
32use mz_sql::session::vars::IsolationLevel;
33use mz_sql_parser::ast::{CopyDirection, CopyRelation, ExplainStage, ShowStatement, Statement};
34use mz_transform::EmptyStatisticsOracle;
35use mz_transform::dataflow::DataflowMetainfo;
36use opentelemetry::trace::TraceContextExt;
37use tracing::{Span, debug};
38use tracing_opentelemetry::OpenTelemetrySpanExt;
39
40use crate::catalog::{Catalog, CatalogState};
41use crate::command::Command;
42use crate::coord::peek::{FastPathPlan, PeekPlan};
43use crate::coord::sequencer::{eval_copy_to_uri, statistics_oracle};
44use crate::coord::timeline::timedomain_for;
45use crate::coord::timestamp_selection::TimestampDetermination;
46use crate::coord::{
47    Coordinator, CopyToContext, ExecuteContextGuard, ExplainContext, ExplainPlanContext,
48    TargetCluster,
49};
50use crate::explain::insights::PlanInsightsContext;
51use crate::explain::optimizer_trace::OptimizerTrace;
52use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
53use crate::optimize::{Optimize, OptimizerError};
54use crate::session::{Session, TransactionOps, TransactionStatus};
55use crate::statement_logging::WatchSetCreation;
56use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
57use crate::{
58    AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, PeekClient, ReadHolds,
59    TimelineContext, TimestampContext, TimestampProvider, optimize,
60};
61use crate::{coord, metrics};
62
63impl PeekClient {
64    /// Attempt to sequence a peek from the session task.
65    ///
66    /// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the
67    /// Coordinator's sequencing. If it returns an error, it should be returned to the user.
68    ///
69    /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
70    /// triggering the execution of the underlying query.
71    pub(crate) async fn try_frontend_peek(
72        &mut self,
73        portal_name: &str,
74        session: &mut Session,
75        outer_ctx_extra: &mut Option<ExecuteContextGuard>,
76    ) -> Result<Option<ExecuteResponse>, AdapterError> {
77        // # From handle_execute
78
79        if session.vars().emit_trace_id_notice() {
80            let span_context = tracing::Span::current()
81                .context()
82                .span()
83                .span_context()
84                .clone();
85            if span_context.is_valid() {
86                session.add_notice(AdapterNotice::QueryTrace {
87                    trace_id: span_context.trace_id(),
88                });
89            }
90        }
91
92        // TODO(peek-seq): This snapshot is wasted when we end up bailing out from the frontend peek
93        // sequencing. We could solve this is with that optimization where we
94        // continuously keep a catalog snapshot in the session, and only get a new one when the
95        // catalog revision has changed, which we could see with an atomic read.
96        // But anyhow, this problem will just go away when we reach the point that we never fall
97        // back to the old sequencing.
98        let catalog = self.catalog_snapshot("try_frontend_peek").await;
99
100        // Extract things from the portal.
101        let (stmt, params, logging, lifecycle_timestamps) = {
102            if let Err(err) = Coordinator::verify_portal(&*catalog, session, portal_name) {
103                outer_ctx_extra
104                    .take()
105                    .and_then(|guard| guard.defuse().retire());
106                return Err(err);
107            }
108            let portal = session
109                .get_portal_unverified(portal_name)
110                // The portal is a session-level thing, so it couldn't have concurrently disappeared
111                // since the above verification.
112                .expect("called verify_portal above");
113            let params = portal.parameters.clone();
114            let stmt = portal.stmt.clone();
115            let logging = Arc::clone(&portal.logging);
116            let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
117            (stmt, params, logging, lifecycle_timestamps)
118        };
119
120        // Before planning, check if this is a statement type we can handle.
121        // This must happen BEFORE statement logging setup to avoid orphaned execution records.
122        if let Some(ref stmt) = stmt {
123            match &**stmt {
124                Statement::Select(_)
125                | Statement::ExplainAnalyzeObject(_)
126                | Statement::ExplainAnalyzeCluster(_)
127                | Statement::Show(ShowStatement::ShowObjects(_))
128                | Statement::Show(ShowStatement::ShowColumns(_)) => {
129                    // These are always fine, just continue.
130                    // Note: EXPLAIN ANALYZE will `plan` to `Plan::Select`.
131                    // Note: ShowObjects plans to `Plan::Select`, ShowColumns plans to `Plan::ShowColumns`.
132                    // We handle `Plan::ShowColumns` specially in `try_frontend_peek_inner`.
133                }
134                Statement::ExplainPlan(explain_stmt) => {
135                    // Only handle ExplainPlan for SELECT statements.
136                    // We don't want to handle e.g. EXPLAIN CREATE MATERIALIZED VIEW here, because that
137                    // requires purification before planning, which the frontend peek sequencing doesn't
138                    // do.
139                    match &explain_stmt.explainee {
140                        mz_sql_parser::ast::Explainee::Select(..) => {
141                            // This is a SELECT, continue
142                        }
143                        _ => {
144                            debug!(
145                                "Bailing out from try_frontend_peek, because EXPLAIN is not for a SELECT query"
146                            );
147                            return Ok(None);
148                        }
149                    }
150                }
151                Statement::ExplainPushdown(explain_stmt) => {
152                    // Only handle EXPLAIN FILTER PUSHDOWN for non-BROKEN SELECT statements
153                    match &explain_stmt.explainee {
154                        mz_sql_parser::ast::Explainee::Select(_, false) => {}
155                        _ => {
156                            debug!(
157                                "Bailing out from try_frontend_peek, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is for EXPLAIN BROKEN"
158                            );
159                            return Ok(None);
160                        }
161                    }
162                }
163                Statement::Copy(copy_stmt) => {
164                    match &copy_stmt.direction {
165                        CopyDirection::To => {
166                            // Check for SUBSCRIBE inside COPY TO - we don't handle Plan::Subscribe
167                            if matches!(&copy_stmt.relation, CopyRelation::Subscribe(_)) {
168                                debug!(
169                                    "Bailing out from try_frontend_peek, because COPY (SUBSCRIBE ...) TO is not supported"
170                                );
171                                return Ok(None);
172                            }
173                            // This is COPY TO (SELECT), continue
174                        }
175                        CopyDirection::From => {
176                            debug!(
177                                "Bailing out from try_frontend_peek, because COPY FROM is not supported"
178                            );
179                            return Ok(None);
180                        }
181                    }
182                }
183                _ => {
184                    debug!(
185                        "Bailing out from try_frontend_peek, because statement type is not supported"
186                    );
187                    return Ok(None);
188                }
189            }
190        }
191
192        // Set up statement logging, and log the beginning of execution.
193        // (But only if we're not executing in the context of another statement.)
194        let statement_logging_id = if outer_ctx_extra.is_none() {
195            // This is a new statement, so begin statement logging
196            let result = self.statement_logging_frontend.begin_statement_execution(
197                session,
198                &params,
199                &logging,
200                catalog.system_config(),
201                lifecycle_timestamps,
202            );
203
204            if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result {
205                self.log_began_execution(began_execution, mseh_update, prepared_statement);
206                Some(logging_id)
207            } else {
208                None
209            }
210        } else {
211            // We're executing in the context of another statement (e.g., FETCH),
212            // so extract the statement logging ID from the outer context if present.
213            // We take ownership and retire the outer context here. The end of execution will be
214            // logged in one of the following ways:
215            // - At the end of this function, if the execution is finished by then.
216            // - Later by the Coordinator, either due to RegisterFrontendPeek or ExecuteSlowPathPeek.
217            outer_ctx_extra
218                .take()
219                .and_then(|guard| guard.defuse().retire())
220        };
221
222        let result = self
223            .try_frontend_peek_inner(session, catalog, stmt, params, statement_logging_id)
224            .await;
225
226        // Log the end of execution if we are logging this statement and execution has already
227        // ended.
228        if let Some(logging_id) = statement_logging_id {
229            let reason = match &result {
230                // Streaming results are handled asynchronously by the coordinator
231                Ok(Some(ExecuteResponse::SendingRowsStreaming { .. })) => {
232                    // Don't log here - the peek is still executing.
233                    // It will be logged when handle_peek_notification is called.
234                    return result;
235                }
236                // COPY TO needs to check its inner response
237                Ok(Some(resp @ ExecuteResponse::CopyTo { resp: inner, .. })) => {
238                    match inner.as_ref() {
239                        ExecuteResponse::SendingRowsStreaming { .. } => {
240                            // Don't log here - the peek is still executing.
241                            // It will be logged when handle_peek_notification is called.
242                            return result;
243                        }
244                        // For non-streaming COPY TO responses, use the outer CopyTo for conversion
245                        _ => resp.into(),
246                    }
247                }
248                // Bailout case, which should not happen
249                Ok(None) => {
250                    soft_panic_or_log!(
251                        "Bailed out from `try_frontend_peek_inner` after we already logged the beginning of statement execution."
252                    );
253                    // This statement will be handled by the old peek sequencing, which will do its
254                    // own statement logging from the beginning. So, let's close out this one.
255                    self.log_ended_execution(
256                        logging_id,
257                        StatementEndedExecutionReason::Errored {
258                            error: "Internal error: bailed out from `try_frontend_peek_inner`"
259                                .to_string(),
260                        },
261                    );
262                    return result;
263                }
264                // All other success responses - use the From implementation
265                // TODO(peek-seq): After we delete the old peek sequencing, we'll be able to adjust
266                // the From implementation to do exactly what we need in the frontend peek
267                // sequencing, so that the above special cases won't be needed.
268                Ok(Some(resp)) => resp.into(),
269                Err(e) => StatementEndedExecutionReason::Errored {
270                    error: e.to_string(),
271                },
272            };
273
274            self.log_ended_execution(logging_id, reason);
275        }
276
277        result
278    }
279
280    /// This is encapsulated in an inner function so that the outer function can still do statement
281    /// logging after the `?` returns of the inner function.
282    async fn try_frontend_peek_inner(
283        &mut self,
284        session: &mut Session,
285        catalog: Arc<Catalog>,
286        stmt: Option<Arc<Statement<Raw>>>,
287        params: Params,
288        statement_logging_id: Option<crate::statement_logging::StatementLoggingId>,
289    ) -> Result<Option<ExecuteResponse>, AdapterError> {
290        let stmt = match stmt {
291            Some(stmt) => stmt,
292            None => {
293                debug!("try_frontend_peek_inner succeeded on an empty query");
294                return Ok(Some(ExecuteResponse::EmptyQuery));
295            }
296        };
297
298        session
299            .metrics()
300            .query_total(&[
301                metrics::session_type_label_value(session.user()),
302                metrics::statement_type_label_value(&stmt),
303            ])
304            .inc();
305
306        // # From handle_execute_inner
307
308        let conn_catalog = catalog.for_session(session);
309        // (`resolved_ids` should be derivable from `stmt`. If `stmt` is later transformed to
310        // remove/add IDs, then `resolved_ids` should be updated to also remove/add those IDs.)
311        let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?;
312
313        let pcx = session.pcx();
314        let plan = mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, &params, &resolved_ids)?;
315
316        let (select_plan, explain_ctx, copy_to_ctx) = match &plan {
317            Plan::Select(select_plan) => {
318                let explain_ctx = if session.vars().emit_plan_insights_notice() {
319                    let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
320                    ExplainContext::PlanInsightsNotice(optimizer_trace)
321                } else {
322                    ExplainContext::None
323                };
324                (select_plan, explain_ctx, None)
325            }
326            Plan::ShowColumns(show_columns_plan) => {
327                // ShowColumns wraps a SelectPlan, extract it and proceed as normal.
328                (&show_columns_plan.select_plan, ExplainContext::None, None)
329            }
330            Plan::ExplainPlan(plan::ExplainPlanPlan {
331                stage,
332                format,
333                config,
334                explainee:
335                    plan::Explainee::Statement(plan::ExplaineeStatement::Select { broken, plan, desc }),
336            }) => {
337                // Create OptimizerTrace to collect optimizer plans
338                let optimizer_trace = OptimizerTrace::new(stage.paths());
339                let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
340                    broken: *broken,
341                    config: config.clone(),
342                    format: *format,
343                    stage: *stage,
344                    replan: None,
345                    desc: Some(desc.clone()),
346                    optimizer_trace,
347                });
348                (plan, explain_ctx, None)
349            }
350            // COPY TO S3
351            Plan::CopyTo(plan::CopyToPlan {
352                select_plan,
353                desc,
354                to,
355                connection,
356                connection_id,
357                format,
358                max_file_size,
359            }) => {
360                let uri = eval_copy_to_uri(to.clone(), session, catalog.state())?;
361
362                // (output_batch_count will be set later)
363                let copy_to_ctx = CopyToContext {
364                    desc: desc.clone(),
365                    uri,
366                    connection: connection.clone(),
367                    connection_id: *connection_id,
368                    format: format.clone(),
369                    max_file_size: *max_file_size,
370                    output_batch_count: None,
371                };
372
373                (select_plan, ExplainContext::None, Some(copy_to_ctx))
374            }
375            Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => {
376                // Only handle EXPLAIN FILTER PUSHDOWN for SELECT statements
377                match explainee {
378                    plan::Explainee::Statement(plan::ExplaineeStatement::Select {
379                        broken: false,
380                        plan,
381                        desc: _,
382                    }) => {
383                        let explain_ctx = ExplainContext::Pushdown;
384                        (plan, explain_ctx, None)
385                    }
386                    _ => {
387                        // This shouldn't happen because we already checked for this at the AST
388                        // level before calling `try_frontend_peek_inner`.
389                        soft_panic_or_log!(
390                            "unexpected EXPLAIN FILTER PUSHDOWN plan kind in frontend peek sequencing: {:?}",
391                            explainee
392                        );
393                        debug!(
394                            "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is EXPLAIN BROKEN"
395                        );
396                        return Ok(None);
397                    }
398                }
399            }
400            Plan::SideEffectingFunc(sef_plan) => {
401                // Side-effecting functions need Coordinator state (e.g., active_conns),
402                // so delegate to the Coordinator via a Command.
403                // The RBAC check is performed in the Coordinator where active_conns is available.
404                let response = self
405                    .call_coordinator(|tx| Command::ExecuteSideEffectingFunc {
406                        plan: sef_plan.clone(),
407                        conn_id: session.conn_id().clone(),
408                        current_role: session.role_metadata().current_role,
409                        tx,
410                    })
411                    .await?;
412                return Ok(Some(response));
413            }
414            _ => {
415                // This shouldn't happen because we already checked for this at the AST
416                // level before calling `try_frontend_peek_inner`.
417                soft_panic_or_log!(
418                    "Unexpected plan kind in frontend peek sequencing: {:?}",
419                    plan
420                );
421                debug!(
422                    "Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, side-effecting SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO S3"
423                );
424                return Ok(None);
425            }
426        };
427
428        // # From sequence_plan
429
430        // We have checked the plan kind above.
431        assert!(plan.allowed_in_read_only());
432
433        let target_cluster = match session.transaction().cluster() {
434            // Use the current transaction's cluster.
435            Some(cluster_id) => TargetCluster::Transaction(cluster_id),
436            // If there isn't a current cluster set for a transaction, then try to auto route.
437            None => {
438                coord::catalog_serving::auto_run_on_catalog_server(&conn_catalog, session, &plan)
439            }
440        };
441        let (cluster, target_cluster_id, target_cluster_name) = {
442            let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
443            (cluster, cluster.id, &cluster.name)
444        };
445
446        // Log cluster selection
447        if let Some(logging_id) = &statement_logging_id {
448            self.log_set_cluster(*logging_id, target_cluster_id);
449        }
450
451        coord::catalog_serving::check_cluster_restrictions(
452            target_cluster_name.as_str(),
453            &conn_catalog,
454            &plan,
455        )?;
456
457        rbac::check_plan(
458            &conn_catalog,
459            // We can't look at `active_conns` here, but that's ok, because this case was handled
460            // above already inside `Command::ExecuteSideEffectingFunc`.
461            None::<fn(u32) -> Option<RoleId>>,
462            session,
463            &plan,
464            Some(target_cluster_id),
465            &resolved_ids,
466        )?;
467
468        if let Some((_, wait_future)) =
469            coord::appends::waiting_on_startup_appends(&*catalog, session, &plan)
470        {
471            wait_future.await;
472        }
473
474        let max_query_result_size = Some(session.vars().max_query_result_size());
475
476        // # From sequence_peek
477
478        // # From peek_validate
479
480        let compute_instance_snapshot =
481            ComputeInstanceSnapshot::new_without_collections(cluster.id());
482
483        let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config())
484            .override_from(&catalog.get_cluster(cluster.id()).config.features())
485            .override_from(&explain_ctx);
486
487        if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
488            return Err(AdapterError::NoClusterReplicasAvailable {
489                name: cluster.name.clone(),
490                is_managed: cluster.is_managed(),
491            });
492        }
493
494        let (_, view_id) = self.transient_id_gen.allocate_id();
495        let (_, index_id) = self.transient_id_gen.allocate_id();
496
497        let mut optimizer = if let Some(mut copy_to_ctx) = copy_to_ctx {
498            // COPY TO path: calculate output_batch_count and create copy_to optimizer
499            let worker_counts = cluster.replicas().map(|r| {
500                let loc = &r.config.location;
501                loc.workers().unwrap_or_else(|| loc.num_processes())
502            });
503            let max_worker_count = match worker_counts.max() {
504                Some(count) => u64::cast_from(count),
505                None => {
506                    return Err(AdapterError::NoClusterReplicasAvailable {
507                        name: cluster.name.clone(),
508                        is_managed: cluster.is_managed(),
509                    });
510                }
511            };
512            copy_to_ctx.output_batch_count = Some(max_worker_count);
513
514            Either::Right(optimize::copy_to::Optimizer::new(
515                Arc::clone(&catalog),
516                compute_instance_snapshot.clone(),
517                view_id,
518                copy_to_ctx,
519                optimizer_config,
520                self.optimizer_metrics.clone(),
521            ))
522        } else {
523            // SELECT/EXPLAIN path: create peek optimizer
524            Either::Left(optimize::peek::Optimizer::new(
525                Arc::clone(&catalog),
526                compute_instance_snapshot.clone(),
527                select_plan.finishing.clone(),
528                view_id,
529                index_id,
530                optimizer_config,
531                self.optimizer_metrics.clone(),
532            ))
533        };
534
535        let target_replica_name = session.vars().cluster_replica();
536        let mut target_replica = target_replica_name
537            .map(|name| {
538                cluster
539                    .replica_id(name)
540                    .ok_or(AdapterError::UnknownClusterReplica {
541                        cluster_name: cluster.name.clone(),
542                        replica_name: name.to_string(),
543                    })
544            })
545            .transpose()?;
546
547        let source_ids = select_plan.source.depends_on();
548        // TODO(peek-seq): validate_timeline_context can be expensive in real scenarios (not in
549        // simple benchmarks), because it traverses transitive dependencies even of indexed views and
550        // materialized views (also traversing their MIR plans).
551        let mut timeline_context = catalog.validate_timeline_context(source_ids.iter().copied())?;
552        if matches!(timeline_context, TimelineContext::TimestampIndependent)
553            && select_plan.source.contains_temporal()?
554        {
555            // If the source IDs are timestamp independent but the query contains temporal functions,
556            // then the timeline context needs to be upgraded to timestamp dependent. This is
557            // required because `source_ids` doesn't contain functions.
558            timeline_context = TimelineContext::TimestampDependent;
559        }
560
561        let notices = coord::sequencer::check_log_reads(
562            &catalog,
563            cluster,
564            &source_ids,
565            &mut target_replica,
566            session.vars(),
567        )?;
568        session.add_notices(notices);
569
570        // # From peek_linearize_timestamp
571
572        let isolation_level = session.vars().transaction_isolation().clone();
573        let timeline = Coordinator::get_timeline(&timeline_context);
574        let needs_linearized_read_ts =
575            Coordinator::needs_linearized_read_ts(&isolation_level, &select_plan.when);
576
577        let oracle_read_ts = match timeline {
578            Some(timeline) if needs_linearized_read_ts => {
579                let oracle = self.ensure_oracle(timeline).await?;
580                let oracle_read_ts = oracle.read_ts().await;
581                Some(oracle_read_ts)
582            }
583            Some(_) | None => None,
584        };
585
586        // # From peek_real_time_recency
587
588        let vars = session.vars();
589        let real_time_recency_ts: Option<Timestamp> = if vars.real_time_recency()
590            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
591            && !session.contains_read_timestamp()
592        {
593            // Only call the coordinator when we actually need real-time recency
594            self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp {
595                source_ids: source_ids.clone(),
596                real_time_recency_timeout: *vars.real_time_recency_timeout(),
597                tx,
598            })
599            .await?
600        } else {
601            None
602        };
603
604        // # From peek_timestamp_read_hold
605
606        let dataflow_builder = DataflowBuilder::new(catalog.state(), compute_instance_snapshot);
607        let input_id_bundle = dataflow_builder.sufficient_collections(source_ids.clone());
608
609        // ## From sequence_peek_timestamp
610
611        // Warning: This will be false for AS OF queries, even if we are otherwise inside a
612        // multi-statement transaction. (It's also false for FreshestTableWrite, which is currently
613        // only read-then-write queries, which can't be part of multi-statement transactions, so
614        // FreshestTableWrite doesn't matter.)
615        //
616        // TODO(peek-seq): It's not totally clear to me what the intended semantics are for AS OF
617        // queries inside a transaction: We clearly can't use the transaction timestamp, but the old
618        // peek sequencing still does a timedomain validation. The new peek sequencing does not do
619        // timedomain validation for AS OF queries, which seems more natural. But I'm thinking that
620        // it would be the cleanest to just simply disallow AS OF queries inside transactions.
621        let in_immediate_multi_stmt_txn = session
622            .transaction()
623            .in_immediate_multi_stmt_txn(&select_plan.when);
624
625        // Fetch or generate a timestamp for this query and fetch or acquire read holds.
626        let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
627            // Use the transaction's timestamp if it exists and this isn't an AS OF query.
628            // (`in_immediate_multi_stmt_txn` is false for AS OF queries.)
629            Some(
630                determination @ TimestampDetermination {
631                    timestamp_context: TimestampContext::TimelineTimestamp { .. },
632                    ..
633                },
634            ) if in_immediate_multi_stmt_txn => {
635                // This is a subsequent (non-AS OF, non-constant) query in a multi-statement
636                // transaction. We now:
637                // - Validate that the query only accesses collections within the transaction's
638                //   timedomain (which we know from the stored read holds).
639                // - Use the transaction's stored timestamp determination.
640                // - Use the (relevant subset of the) transaction's read holds.
641
642                let txn_read_holds_opt = self
643                    .call_coordinator(|tx| Command::GetTransactionReadHoldsBundle {
644                        conn_id: session.conn_id().clone(),
645                        tx,
646                    })
647                    .await;
648
649                if let Some(txn_read_holds) = txn_read_holds_opt {
650                    let allowed_id_bundle = txn_read_holds.id_bundle();
651                    let outside = input_id_bundle.difference(&allowed_id_bundle);
652
653                    // Queries without a timestamp and timeline can belong to any existing timedomain.
654                    if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
655                        let valid_names =
656                            allowed_id_bundle.resolve_names(&*catalog, session.conn_id());
657                        let invalid_names = outside.resolve_names(&*catalog, session.conn_id());
658                        return Err(AdapterError::RelationOutsideTimeDomain {
659                            relations: invalid_names,
660                            names: valid_names,
661                        });
662                    }
663
664                    // Extract the subset of read holds for the collections this query accesses.
665                    let read_holds = txn_read_holds.subset(&input_id_bundle);
666
667                    (determination, read_holds)
668                } else {
669                    // This should never happen: we're in a subsequent query of a multi-statement
670                    // transaction (we have a transaction timestamp), but the coordinator has no
671                    // transaction read holds stored. This indicates a bug in the transaction
672                    // handling.
673                    return Err(AdapterError::Internal(
674                        "Missing transaction read holds for multi-statement transaction"
675                            .to_string(),
676                    ));
677                }
678            }
679            _ => {
680                // There is no timestamp determination yet for this transaction. Either:
681                // - We are not in a multi-statement transaction.
682                // - This is the first (non-AS OF) query in a multi-statement transaction.
683                // - This is an AS OF query.
684                // - This is a constant query (`TimestampContext::NoTimestamp`).
685
686                let timedomain_bundle;
687                let determine_bundle = if in_immediate_multi_stmt_txn {
688                    // This is the first (non-AS OF) query in a multi-statement transaction.
689                    // Determine a timestamp that will be valid for anything in any schema
690                    // referenced by the first query.
691                    timedomain_bundle = timedomain_for(
692                        &*catalog,
693                        &dataflow_builder,
694                        &source_ids,
695                        &timeline_context,
696                        session.conn_id(),
697                        target_cluster_id,
698                    )?;
699                    &timedomain_bundle
700                } else {
701                    // Simply use the inputs of the current query.
702                    &input_id_bundle
703                };
704                let (determination, read_holds) = self
705                    .frontend_determine_timestamp(
706                        catalog.state(),
707                        session,
708                        determine_bundle,
709                        &select_plan.when,
710                        target_cluster_id,
711                        &timeline_context,
712                        oracle_read_ts,
713                        real_time_recency_ts,
714                    )
715                    .await?;
716
717                // If this is the first (non-AS OF) query in a multi-statement transaction, store
718                // the read holds in the coordinator, so subsequent queries can validate against
719                // them.
720                if in_immediate_multi_stmt_txn {
721                    self.call_coordinator(|tx| Command::StoreTransactionReadHolds {
722                        conn_id: session.conn_id().clone(),
723                        read_holds: read_holds.clone(),
724                        tx,
725                    })
726                    .await;
727                }
728
729                (determination, read_holds)
730            }
731        };
732
733        {
734            // Assert that we have a read hold for all the collections in our `input_id_bundle`.
735            for id in input_id_bundle.iter() {
736                let s = read_holds.storage_holds.contains_key(&id);
737                let c = read_holds
738                    .compute_ids()
739                    .map(|(_instance, coll)| coll)
740                    .contains(&id);
741                soft_assert_or_log!(
742                    s || c,
743                    "missing read hold for collection {} in `input_id_bundle",
744                    id
745                );
746            }
747
748            // Assert that each part of the `input_id_bundle` corresponds to the right part of
749            // `read_holds`.
750            for id in input_id_bundle.storage_ids.iter() {
751                soft_assert_or_log!(
752                    read_holds.storage_holds.contains_key(id),
753                    "missing storage read hold for collection {} in `input_id_bundle",
754                    id
755                );
756            }
757            for id in input_id_bundle
758                .compute_ids
759                .iter()
760                .flat_map(|(_instance, colls)| colls)
761            {
762                soft_assert_or_log!(
763                    read_holds
764                        .compute_ids()
765                        .map(|(_instance, coll)| coll)
766                        .contains(id),
767                    "missing compute read hold for collection {} in `input_id_bundle",
768                    id,
769                );
770            }
771        }
772
773        // (TODO(peek-seq): The below TODO is copied from the old peek sequencing. We should resolve
774        // this when we decide what to with `AS OF` in transactions.)
775        // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
776        // arbitrary and we don't recall why we did it (possibly an error!). Change this to always
777        // set the transaction ops. Decide and document what our policy should be on AS OF queries.
778        // Maybe they shouldn't be allowed in transactions at all because it's hard to explain
779        // what's going on there. This should probably get a small design document.
780
781        // We only track the peeks in the session if the query doesn't use AS
782        // OF or we're inside an explicit transaction. The latter case is
783        // necessary to support PG's `BEGIN` semantics, whose behavior can
784        // depend on whether or not reads have occurred in the txn.
785        let requires_linearization = (&explain_ctx).into();
786        let mut transaction_determination = determination.clone();
787        if select_plan.when.is_transactional() {
788            session.add_transaction_ops(TransactionOps::Peeks {
789                determination: transaction_determination,
790                cluster_id: target_cluster_id,
791                requires_linearization,
792            })?;
793        } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
794            // If the query uses AS OF, then ignore the timestamp.
795            transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
796            session.add_transaction_ops(TransactionOps::Peeks {
797                determination: transaction_determination,
798                cluster_id: target_cluster_id,
799                requires_linearization,
800            })?;
801        };
802
803        // # From peek_optimize
804
805        let stats = statistics_oracle(
806            session,
807            &source_ids,
808            &determination.timestamp_context.antichain(),
809            true,
810            catalog.system_config(),
811            &*self.storage_collections,
812        )
813        .await
814        .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
815
816        // Generate data structures that can be moved to another task where we will perform possibly
817        // expensive optimizations.
818        let timestamp_context = determination.timestamp_context.clone();
819        let session_meta = session.meta();
820        let now = catalog.config().now.clone();
821        let select_plan = select_plan.clone();
822        let target_cluster_name = target_cluster_name.clone();
823        let needs_plan_insights = explain_ctx.needs_plan_insights();
824        let determination_for_pushdown = if matches!(explain_ctx, ExplainContext::Pushdown) {
825            // This is a hairy data structure, so avoid this clone if we are not in
826            // EXPLAIN FILTER PUSHDOWN.
827            Some(determination.clone())
828        } else {
829            None
830        };
831
832        let span = Span::current();
833
834        // Prepare data for plan insights if needed
835        let catalog_for_insights = if needs_plan_insights {
836            Some(Arc::clone(&catalog))
837        } else {
838            None
839        };
840        let mut compute_instances = BTreeMap::new();
841        if needs_plan_insights {
842            for user_cluster in catalog.user_clusters() {
843                let snapshot = ComputeInstanceSnapshot::new_without_collections(user_cluster.id);
844                compute_instances.insert(user_cluster.name.clone(), snapshot);
845            }
846        }
847
848        // Enum for branching among various execution steps after optimization
849        enum Execution {
850            Peek {
851                global_lir_plan: optimize::peek::GlobalLirPlan,
852                optimization_finished_at: EpochMillis,
853                plan_insights_optimizer_trace: Option<OptimizerTrace>,
854                insights_ctx: Option<Box<PlanInsightsContext>>,
855            },
856            CopyToS3 {
857                global_lir_plan: optimize::copy_to::GlobalLirPlan,
858                source_ids: BTreeSet<GlobalId>,
859            },
860            ExplainPlan {
861                df_meta: DataflowMetainfo,
862                explain_ctx: ExplainPlanContext,
863                optimizer: optimize::peek::Optimizer,
864                insights_ctx: Option<Box<PlanInsightsContext>>,
865            },
866            ExplainPushdown {
867                imports: BTreeMap<GlobalId, mz_expr::MapFilterProject>,
868                determination: TimestampDetermination<Timestamp>,
869            },
870        }
871
872        let source_ids_for_closure = source_ids.clone();
873        let optimization_result = mz_ore::task::spawn_blocking(
874            || "optimize peek",
875            move || {
876                span.in_scope(|| {
877                    let _dispatch_guard = explain_ctx.dispatch_guard();
878
879                    let raw_expr = select_plan.source.clone();
880
881                    // The purpose of wrapping the following in a closure is to control where the
882                    // `?`s return from, so that even when a `catch_unwind_optimize` call fails,
883                    // we can still handle `EXPLAIN BROKEN`.
884                    let pipeline = || -> Result<Either<optimize::peek::GlobalLirPlan, optimize::copy_to::GlobalLirPlan>, OptimizerError> {
885                        match optimizer.as_mut() {
886                            Either::Left(optimizer) => {
887                                // SELECT/EXPLAIN path
888                                // HIR ⇒ MIR lowering and MIR optimization (local)
889                                let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr.clone())?;
890                                // Attach resolved context required to continue the pipeline.
891                                let local_mir_plan =
892                                    local_mir_plan.resolve(timestamp_context.clone(), &session_meta, stats);
893                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
894                                let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
895                                Ok(Either::Left(global_lir_plan))
896                            }
897                            Either::Right(optimizer) => {
898                                // COPY TO path
899                                // HIR ⇒ MIR lowering and MIR optimization (local)
900                                let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr.clone())?;
901                                // Attach resolved context required to continue the pipeline.
902                                let local_mir_plan =
903                                    local_mir_plan.resolve(timestamp_context.clone(), &session_meta, stats);
904                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
905                                let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
906                                Ok(Either::Right(global_lir_plan))
907                            }
908                        }
909                    };
910
911                    let global_lir_plan_result = pipeline();
912                    let optimization_finished_at = now();
913
914                    let create_insights_ctx = |optimizer: &optimize::peek::Optimizer, is_notice: bool| -> Option<Box<PlanInsightsContext>> {
915                        if !needs_plan_insights {
916                            return None;
917                        }
918
919                        let catalog = catalog_for_insights.as_ref()?;
920
921                        let enable_re_optimize = if needs_plan_insights {
922                            // Disable any plan insights that use the optimizer if we only want the
923                            // notice and plan optimization took longer than the threshold. This is
924                            // to prevent a situation where optimizing takes a while and there are
925                            // lots of clusters, which would delay peek execution by the product of
926                            // those.
927                            //
928                            // (This heuristic doesn't work well, see #9492.)
929                            let opt_limit = mz_adapter_types::dyncfgs::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
930                                .get(catalog.system_config().dyncfgs());
931                            !(is_notice && optimizer.duration() > opt_limit)
932                        } else {
933                            false
934                        };
935
936                        Some(Box::new(PlanInsightsContext {
937                            stmt: select_plan.select.as_deref().map(Clone::clone).map(Statement::Select),
938                            raw_expr: raw_expr.clone(),
939                            catalog: Arc::clone(catalog),
940                            compute_instances,
941                            target_instance: target_cluster_name,
942                            metrics: optimizer.metrics().clone(),
943                            finishing: optimizer.finishing().clone(),
944                            optimizer_config: optimizer.config().clone(),
945                            session: session_meta,
946                            timestamp_context,
947                            view_id: optimizer.select_id(),
948                            index_id: optimizer.index_id(),
949                            enable_re_optimize,
950                        }))
951                    };
952
953                    match global_lir_plan_result {
954                        Ok(Either::Left(global_lir_plan)) => {
955                            // SELECT/EXPLAIN path
956                            let optimizer = optimizer.unwrap_left();
957                            match explain_ctx {
958                                ExplainContext::Plan(explain_ctx) => {
959                                    let (_, df_meta, _) = global_lir_plan.unapply();
960                                    let insights_ctx = create_insights_ctx(&optimizer, false);
961                                    Ok(Execution::ExplainPlan {
962                                        df_meta,
963                                        explain_ctx,
964                                        optimizer,
965                                        insights_ctx,
966                                    })
967                                }
968                                ExplainContext::None => {
969                                    Ok(Execution::Peek {
970                                        global_lir_plan,
971                                        optimization_finished_at,
972                                        plan_insights_optimizer_trace: None,
973                                        insights_ctx: None,
974                                    })
975                                }
976                                ExplainContext::PlanInsightsNotice(optimizer_trace) => {
977                                    let insights_ctx = create_insights_ctx(&optimizer, true);
978                                    Ok(Execution::Peek {
979                                        global_lir_plan,
980                                        optimization_finished_at,
981                                        plan_insights_optimizer_trace: Some(optimizer_trace),
982                                        insights_ctx,
983                                    })
984                                }
985                                ExplainContext::Pushdown => {
986                                    let (plan, _, _) = global_lir_plan.unapply();
987                                    let imports = match plan {
988                                        PeekPlan::SlowPath(plan) => plan
989                                            .desc
990                                            .source_imports
991                                            .into_iter()
992                                            .filter_map(|(id, (desc, _, _upper))| {
993                                                desc.arguments.operators.map(|mfp| (id, mfp))
994                                            })
995                                            .collect(),
996                                        PeekPlan::FastPath(_) => std::collections::BTreeMap::default(),
997                                    };
998                                    Ok(Execution::ExplainPushdown {
999                                        imports,
1000                                        determination: determination_for_pushdown.expect("it's present for the ExplainPushdown case"),
1001                                    })
1002                                }
1003                            }
1004                        }
1005                        Ok(Either::Right(global_lir_plan)) => {
1006                            // COPY TO S3 path
1007                            Ok(Execution::CopyToS3 {
1008                                global_lir_plan,
1009                                source_ids: source_ids_for_closure,
1010                            })
1011                        }
1012                        Err(err) => {
1013                            if optimizer.is_right() {
1014                                // COPY TO has no EXPLAIN BROKEN support
1015                                return Err(err);
1016                            }
1017                            // SELECT/EXPLAIN error handling
1018                            let optimizer = optimizer.expect_left("checked above");
1019                            if let ExplainContext::Plan(explain_ctx) = explain_ctx {
1020                                if explain_ctx.broken {
1021                                    // EXPLAIN BROKEN: log error and continue with defaults
1022                                    tracing::error!("error while handling EXPLAIN statement: {}", err);
1023                                    Ok(Execution::ExplainPlan {
1024                                        df_meta: Default::default(),
1025                                        explain_ctx,
1026                                        optimizer,
1027                                        insights_ctx: None,
1028                                    })
1029                                } else {
1030                                    Err(err)
1031                                }
1032                            } else {
1033                                Err(err)
1034                            }
1035                        }
1036                    }
1037                })
1038            },
1039        )
1040        .await
1041        .map_err(|optimizer_error| AdapterError::Internal(format!("internal error in optimizer: {}", optimizer_error)))?;
1042
1043        // Log optimization finished
1044        if let Some(logging_id) = &statement_logging_id {
1045            self.log_lifecycle_event(*logging_id, StatementLifecycleEvent::OptimizationFinished);
1046        }
1047
1048        // Handle the optimization result: either generate EXPLAIN output or continue with execution
1049        match optimization_result {
1050            Execution::ExplainPlan {
1051                df_meta,
1052                explain_ctx,
1053                optimizer,
1054                insights_ctx,
1055            } => {
1056                let rows = coord::sequencer::explain_plan_inner(
1057                    session,
1058                    &catalog,
1059                    df_meta,
1060                    explain_ctx,
1061                    optimizer,
1062                    insights_ctx,
1063                )
1064                .await?;
1065
1066                Ok(Some(ExecuteResponse::SendingRowsImmediate {
1067                    rows: Box::new(rows.into_row_iter()),
1068                }))
1069            }
1070            Execution::ExplainPushdown {
1071                imports,
1072                determination,
1073            } => {
1074                // # From peek_explain_pushdown
1075
1076                let as_of = determination.timestamp_context.antichain();
1077                let mz_now = determination
1078                    .timestamp_context
1079                    .timestamp()
1080                    .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1081                    .unwrap_or_else(ResultSpec::value_all);
1082
1083                Ok(Some(
1084                    coord::sequencer::explain_pushdown_future_inner(
1085                        session,
1086                        &*catalog,
1087                        &self.storage_collections,
1088                        as_of,
1089                        mz_now,
1090                        imports,
1091                    )
1092                    .await
1093                    .await?,
1094                ))
1095            }
1096            Execution::Peek {
1097                global_lir_plan,
1098                optimization_finished_at: _optimization_finished_at,
1099                plan_insights_optimizer_trace,
1100                insights_ctx,
1101            } => {
1102                // Continue with normal execution
1103                // # From peek_finish
1104
1105                let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
1106
1107                coord::sequencer::emit_optimizer_notices(
1108                    &*catalog,
1109                    session,
1110                    &df_meta.optimizer_notices,
1111                );
1112
1113                // Generate plan insights notice if needed
1114                if let Some(trace) = plan_insights_optimizer_trace {
1115                    let target_cluster = catalog.get_cluster(target_cluster_id);
1116                    let features = OptimizerFeatures::from(catalog.system_config())
1117                        .override_from(&target_cluster.config.features());
1118                    let insights = trace
1119                        .into_plan_insights(
1120                            &features,
1121                            &catalog.for_session(session),
1122                            Some(select_plan.finishing.clone()),
1123                            Some(target_cluster),
1124                            df_meta.clone(),
1125                            insights_ctx,
1126                        )
1127                        .await?;
1128                    session.add_notice(AdapterNotice::PlanInsights(insights));
1129                }
1130
1131                // # Now back to peek_finish
1132
1133                let watch_set = statement_logging_id.map(|logging_id| {
1134                    WatchSetCreation::new(
1135                        logging_id,
1136                        catalog.state(),
1137                        &input_id_bundle,
1138                        determination.timestamp_context.timestamp_or_default(),
1139                    )
1140                });
1141
1142                let max_result_size = catalog.system_config().max_result_size();
1143
1144                // Clone determination if we need it for emit_timestamp_notice, since it may be
1145                // moved into Command::ExecuteSlowPathPeek.
1146                let determination_for_notice = if session.vars().emit_timestamp_notice() {
1147                    Some(determination.clone())
1148                } else {
1149                    None
1150                };
1151
1152                let response = match peek_plan {
1153                    PeekPlan::FastPath(fast_path_plan) => {
1154                        if let Some(logging_id) = &statement_logging_id {
1155                            // TODO(peek-seq): Actually, we should log it also for
1156                            // FastPathPlan::Constant. The only reason we are not doing so at the
1157                            // moment is to match the old peek sequencing, so that statement logging
1158                            // tests pass with the frontend peek sequencing turned both on and off.
1159                            //
1160                            // When the old sequencing is removed, we should make a couple of
1161                            // changes in how we log timestamps:
1162                            // - Move this up to just after timestamp determination, so that it
1163                            //   appears in the log as soon as possible.
1164                            // - Do it also for Constant peeks.
1165                            // - Currently, slow-path peeks' timestamp logging is done by
1166                            //   `implement_peek_plan`. We could remove it from there, and just do
1167                            //   it here.
1168                            if !matches!(fast_path_plan, FastPathPlan::Constant(..)) {
1169                                self.log_set_timestamp(
1170                                    *logging_id,
1171                                    determination.timestamp_context.timestamp_or_default(),
1172                                );
1173                            }
1174                        }
1175
1176                        let row_set_finishing_seconds =
1177                            session.metrics().row_set_finishing_seconds().clone();
1178
1179                        let peek_stash_read_batch_size_bytes =
1180                            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
1181                                .get(catalog.system_config().dyncfgs());
1182                        let peek_stash_read_memory_budget_bytes =
1183                            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
1184                                .get(catalog.system_config().dyncfgs());
1185
1186                        self.implement_fast_path_peek_plan(
1187                            fast_path_plan,
1188                            determination.timestamp_context.timestamp_or_default(),
1189                            select_plan.finishing,
1190                            target_cluster_id,
1191                            target_replica,
1192                            typ,
1193                            max_result_size,
1194                            max_query_result_size,
1195                            row_set_finishing_seconds,
1196                            read_holds,
1197                            peek_stash_read_batch_size_bytes,
1198                            peek_stash_read_memory_budget_bytes,
1199                            session.conn_id().clone(),
1200                            source_ids,
1201                            watch_set,
1202                        )
1203                        .await?
1204                    }
1205                    PeekPlan::SlowPath(dataflow_plan) => {
1206                        {
1207                            // Assert that we have some read holds for all the imports of the dataflow.
1208                            for id in dataflow_plan.desc.source_imports.keys() {
1209                                soft_assert_or_log!(
1210                                    read_holds.storage_holds.contains_key(id),
1211                                    "missing read hold for the source import {}",
1212                                    id
1213                                );
1214                            }
1215                            for id in dataflow_plan.desc.index_imports.keys() {
1216                                soft_assert_or_log!(
1217                                    read_holds
1218                                        .compute_ids()
1219                                        .map(|(_instance, coll)| coll)
1220                                        .contains(id),
1221                                    "missing read hold for the index import {}",
1222                                    id,
1223                                );
1224                            }
1225
1226                            // Also check the holds against the as_of.
1227                            for (id, h) in read_holds.storage_holds.iter() {
1228                                let as_of = dataflow_plan
1229                                    .desc
1230                                    .as_of
1231                                    .clone()
1232                                    .expect("dataflow has an as_of")
1233                                    .into_element();
1234                                soft_assert_or_log!(
1235                                    h.since().less_equal(&as_of),
1236                                    "storage read hold at {:?} for collection {} is not enough for as_of {:?}",
1237                                    h.since(),
1238                                    id,
1239                                    as_of
1240                                );
1241                            }
1242                            for ((instance, id), h) in read_holds.compute_holds.iter() {
1243                                soft_assert_eq_or_log!(
1244                                    *instance,
1245                                    target_cluster_id,
1246                                    "the read hold on {} is on the wrong cluster",
1247                                    id
1248                                );
1249                                let as_of = dataflow_plan
1250                                    .desc
1251                                    .as_of
1252                                    .clone()
1253                                    .expect("dataflow has an as_of")
1254                                    .into_element();
1255                                soft_assert_or_log!(
1256                                    h.since().less_equal(&as_of),
1257                                    "compute read hold at {:?} for collection {} is not enough for as_of {:?}",
1258                                    h.since(),
1259                                    id,
1260                                    as_of
1261                                );
1262                            }
1263                        }
1264
1265                        if let Some(logging_id) = &statement_logging_id {
1266                            self.log_set_transient_index_id(*logging_id, dataflow_plan.id);
1267                        }
1268
1269                        self.call_coordinator(|tx| Command::ExecuteSlowPathPeek {
1270                            dataflow_plan: Box::new(dataflow_plan),
1271                            determination,
1272                            finishing: select_plan.finishing,
1273                            compute_instance: target_cluster_id,
1274                            target_replica,
1275                            intermediate_result_type: typ,
1276                            source_ids,
1277                            conn_id: session.conn_id().clone(),
1278                            max_result_size,
1279                            max_query_result_size,
1280                            watch_set,
1281                            tx,
1282                        })
1283                        .await?
1284                    }
1285                };
1286
1287                // Add timestamp notice if emit_timestamp_notice is enabled
1288                if let Some(determination) = determination_for_notice {
1289                    let explanation = self
1290                        .call_coordinator(|tx| Command::ExplainTimestamp {
1291                            conn_id: session.conn_id().clone(),
1292                            session_wall_time: session.pcx().wall_time,
1293                            cluster_id: target_cluster_id,
1294                            id_bundle: input_id_bundle.clone(),
1295                            determination,
1296                            tx,
1297                        })
1298                        .await;
1299                    session.add_notice(AdapterNotice::QueryTimestamp { explanation });
1300                }
1301
1302                Ok(Some(match select_plan.copy_to {
1303                    None => response,
1304                    // COPY TO STDOUT
1305                    Some(format) => ExecuteResponse::CopyTo {
1306                        format,
1307                        resp: Box::new(response),
1308                    },
1309                }))
1310            }
1311            Execution::CopyToS3 {
1312                global_lir_plan,
1313                source_ids,
1314            } => {
1315                let (df_desc, df_meta) = global_lir_plan.unapply();
1316
1317                coord::sequencer::emit_optimizer_notices(
1318                    &*catalog,
1319                    session,
1320                    &df_meta.optimizer_notices,
1321                );
1322
1323                // Extract S3 sink connection info for preflight check
1324                let sink_id = df_desc.sink_id();
1325                let sinks = &df_desc.sink_exports;
1326                if sinks.len() != 1 {
1327                    return Err(AdapterError::Internal(
1328                        "expected exactly one copy to s3 sink".into(),
1329                    ));
1330                }
1331                let (_, sink_desc) = sinks
1332                    .first_key_value()
1333                    .expect("known to be exactly one copy to s3 sink");
1334                let s3_sink_connection = match &sink_desc.connection {
1335                    mz_compute_types::sinks::ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1336                        conn.clone()
1337                    }
1338                    _ => {
1339                        return Err(AdapterError::Internal(
1340                            "expected copy to s3 oneshot sink".into(),
1341                        ));
1342                    }
1343                };
1344
1345                // Perform S3 preflight check in background task (via coordinator).
1346                // This runs slow S3 operations without blocking the coordinator's main task.
1347                self.call_coordinator(|tx| Command::CopyToPreflight {
1348                    s3_sink_connection,
1349                    sink_id,
1350                    tx,
1351                })
1352                .await?;
1353
1354                // Preflight succeeded, now execute the actual COPY TO dataflow
1355                let watch_set = statement_logging_id.map(|logging_id| {
1356                    WatchSetCreation::new(
1357                        logging_id,
1358                        catalog.state(),
1359                        &input_id_bundle,
1360                        determination.timestamp_context.timestamp_or_default(),
1361                    )
1362                });
1363
1364                let response = self
1365                    .call_coordinator(|tx| Command::ExecuteCopyTo {
1366                        df_desc: Box::new(df_desc),
1367                        compute_instance: target_cluster_id,
1368                        target_replica,
1369                        source_ids,
1370                        conn_id: session.conn_id().clone(),
1371                        watch_set,
1372                        tx,
1373                    })
1374                    .await?;
1375
1376                Ok(Some(response))
1377            }
1378        }
1379    }
1380
1381    /// (Similar to Coordinator::determine_timestamp)
1382    /// Determines the timestamp for a query, acquires read holds that ensure the
1383    /// query remains executable at that time, and returns those.
1384    /// The caller is responsible for eventually dropping those read holds.
1385    ///
1386    /// Note: self is taken &mut because of the lazy fetching in `get_compute_instance_client`.
1387    pub(crate) async fn frontend_determine_timestamp(
1388        &mut self,
1389        catalog_state: &CatalogState,
1390        session: &Session,
1391        id_bundle: &CollectionIdBundle,
1392        when: &QueryWhen,
1393        compute_instance: ComputeInstanceId,
1394        timeline_context: &TimelineContext,
1395        oracle_read_ts: Option<Timestamp>,
1396        real_time_recency_ts: Option<Timestamp>,
1397    ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
1398        // this is copy-pasted from Coordinator
1399
1400        let constraint_based = ConstraintBasedTimestampSelection::from_str(
1401            &CONSTRAINT_BASED_TIMESTAMP_SELECTION.get(catalog_state.system_config().dyncfgs()),
1402        );
1403
1404        let isolation_level = session.vars().transaction_isolation();
1405
1406        let (read_holds, upper) = self
1407            .acquire_read_holds_and_least_valid_write(id_bundle)
1408            .await
1409            .map_err(|err| {
1410                AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1411                    err,
1412                    compute_instance,
1413                )
1414            })?;
1415        let (det, read_holds) = <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1416            session,
1417            id_bundle,
1418            when,
1419            compute_instance,
1420            timeline_context,
1421            oracle_read_ts,
1422            real_time_recency_ts,
1423            isolation_level,
1424            &constraint_based,
1425            read_holds,
1426            upper.clone(),
1427        )?;
1428
1429        session
1430            .metrics()
1431            .determine_timestamp(&[
1432                match det.respond_immediately() {
1433                    true => "true",
1434                    false => "false",
1435                },
1436                isolation_level.as_str(),
1437                &compute_instance.to_string(),
1438                constraint_based.as_str(),
1439            ])
1440            .inc();
1441        if !det.respond_immediately()
1442            && isolation_level == &IsolationLevel::StrictSerializable
1443            && real_time_recency_ts.is_none()
1444        {
1445            // Note down the difference between StrictSerializable and Serializable into a metric.
1446            if let Some(strict) = det.timestamp_context.timestamp() {
1447                let (serializable_det, _tmp_read_holds) =
1448                    <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1449                        session,
1450                        id_bundle,
1451                        when,
1452                        compute_instance,
1453                        timeline_context,
1454                        oracle_read_ts,
1455                        real_time_recency_ts,
1456                        isolation_level,
1457                        &constraint_based,
1458                        read_holds.clone(),
1459                        upper,
1460                    )?;
1461                if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
1462                    session
1463                        .metrics()
1464                        .timestamp_difference_for_strict_serializable_ms(&[
1465                            compute_instance.to_string().as_ref(),
1466                            constraint_based.as_str(),
1467                        ])
1468                        .observe(f64::cast_lossy(u64::from(
1469                            strict.saturating_sub(*serializable),
1470                        )));
1471                }
1472            }
1473        }
1474
1475        Ok((det, read_holds))
1476    }
1477}