Skip to main content

mz_adapter/
frontend_peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::sync::Arc;
13
14use itertools::{Either, Itertools};
15use mz_compute_types::ComputeInstanceId;
16use mz_controller_types::ClusterId;
17use mz_expr::{CollectionPlan, ResultSpec};
18use mz_ore::cast::{CastFrom, CastLossy};
19use mz_ore::collections::CollectionExt;
20use mz_ore::now::EpochMillis;
21use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
22use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
23use mz_repr::role_id::RoleId;
24use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
25use mz_sql::ast::Raw;
26use mz_sql::catalog::CatalogCluster;
27use mz_sql::plan::Params;
28use mz_sql::plan::{self, Explainee, ExplaineeStatement, Plan, QueryWhen};
29use mz_sql::rbac;
30use mz_sql::session::metadata::SessionMetadata;
31use mz_sql::session::vars::IsolationLevel;
32use mz_sql_parser::ast::{CopyDirection, CopyRelation, ExplainStage, ShowStatement, Statement};
33use mz_transform::EmptyStatisticsOracle;
34use mz_transform::dataflow::DataflowMetainfo;
35use opentelemetry::trace::TraceContextExt;
36use tracing::{Span, debug, warn};
37use tracing_opentelemetry::OpenTelemetrySpanExt;
38
39use crate::catalog::Catalog;
40use crate::command::Command;
41use crate::coord::peek::{FastPathPlan, PeekPlan};
42use crate::coord::sequencer::{eval_copy_to_uri, statistics_oracle};
43use crate::coord::timeline::timedomain_for;
44use crate::coord::timestamp_selection::TimestampDetermination;
45use crate::coord::{
46    Coordinator, CopyToContext, ExecuteContextGuard, ExplainContext, ExplainPlanContext,
47    TargetCluster,
48};
49use crate::explain::insights::PlanInsightsContext;
50use crate::explain::optimizer_trace::OptimizerTrace;
51use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
52use crate::optimize::{Optimize, OptimizerError};
53use crate::session::{Session, TransactionOps, TransactionStatus};
54use crate::statement_logging::WatchSetCreation;
55use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
56use crate::{
57    AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, PeekClient, ReadHolds,
58    TimelineContext, TimestampContext, TimestampProvider, optimize,
59};
60use crate::{coord, metrics};
61
62impl PeekClient {
63    /// Attempt to sequence a peek from the session task.
64    ///
65    /// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the
66    /// Coordinator's sequencing. If it returns an error, it should be returned to the user.
67    ///
68    /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
69    /// triggering the execution of the underlying query.
70    pub(crate) async fn try_frontend_peek(
71        &mut self,
72        portal_name: &str,
73        session: &mut Session,
74        outer_ctx_extra: &mut Option<ExecuteContextGuard>,
75    ) -> Result<Option<ExecuteResponse>, AdapterError> {
76        // # From handle_execute
77
78        if session.vars().emit_trace_id_notice() {
79            let span_context = tracing::Span::current()
80                .context()
81                .span()
82                .span_context()
83                .clone();
84            if span_context.is_valid() {
85                session.add_notice(AdapterNotice::QueryTrace {
86                    trace_id: span_context.trace_id(),
87                });
88            }
89        }
90
91        // TODO(peek-seq): This snapshot is wasted when we end up bailing out from the frontend peek
92        // sequencing. We could solve this is with that optimization where we
93        // continuously keep a catalog snapshot in the session, and only get a new one when the
94        // catalog revision has changed, which we could see with an atomic read.
95        // But anyhow, this problem will just go away when we reach the point that we never fall
96        // back to the old sequencing.
97        let catalog = self.catalog_snapshot("try_frontend_peek").await;
98
99        // Extract things from the portal.
100        let (stmt, params, logging, lifecycle_timestamps) = {
101            if let Err(err) = Coordinator::verify_portal(&*catalog, session, portal_name) {
102                outer_ctx_extra
103                    .take()
104                    .and_then(|guard| guard.defuse().retire());
105                return Err(err);
106            }
107            let portal = session
108                .get_portal_unverified(portal_name)
109                // The portal is a session-level thing, so it couldn't have concurrently disappeared
110                // since the above verification.
111                .expect("called verify_portal above");
112            let params = portal.parameters.clone();
113            let stmt = portal.stmt.clone();
114            let logging = Arc::clone(&portal.logging);
115            let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
116            (stmt, params, logging, lifecycle_timestamps)
117        };
118
119        // Before planning, check if this is a statement type we can handle.
120        // This must happen BEFORE statement logging setup to avoid orphaned execution records.
121        if let Some(ref stmt) = stmt {
122            match &**stmt {
123                Statement::Select(_)
124                | Statement::ExplainAnalyzeObject(_)
125                | Statement::ExplainAnalyzeCluster(_)
126                | Statement::Show(ShowStatement::ShowObjects(_))
127                | Statement::Show(ShowStatement::ShowColumns(_)) => {
128                    // These are always fine, just continue.
129                    // Note: EXPLAIN ANALYZE will `plan` to `Plan::Select`.
130                    // Note: ShowObjects plans to `Plan::Select`, ShowColumns plans to `Plan::ShowColumns`.
131                    // We handle `Plan::ShowColumns` specially in `try_frontend_peek_inner`.
132                }
133                Statement::ExplainPlan(explain_stmt) => {
134                    // Only handle ExplainPlan for SELECT statements.
135                    // We don't want to handle e.g. EXPLAIN CREATE MATERIALIZED VIEW here, because that
136                    // requires purification before planning, which the frontend peek sequencing doesn't
137                    // do.
138                    match &explain_stmt.explainee {
139                        mz_sql_parser::ast::Explainee::Select(..) => {
140                            // This is a SELECT, continue
141                        }
142                        _ => {
143                            debug!(
144                                "Bailing out from try_frontend_peek, because EXPLAIN is not for a SELECT query"
145                            );
146                            return Ok(None);
147                        }
148                    }
149                }
150                Statement::ExplainPushdown(explain_stmt) => {
151                    // Only handle EXPLAIN FILTER PUSHDOWN for non-BROKEN SELECT statements
152                    match &explain_stmt.explainee {
153                        mz_sql_parser::ast::Explainee::Select(_, false) => {}
154                        _ => {
155                            debug!(
156                                "Bailing out from try_frontend_peek, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is for EXPLAIN BROKEN"
157                            );
158                            return Ok(None);
159                        }
160                    }
161                }
162                Statement::Copy(copy_stmt) => {
163                    match &copy_stmt.direction {
164                        CopyDirection::To => {
165                            // Check for SUBSCRIBE inside COPY TO - we don't handle Plan::Subscribe
166                            if matches!(&copy_stmt.relation, CopyRelation::Subscribe(_)) {
167                                debug!(
168                                    "Bailing out from try_frontend_peek, because COPY (SUBSCRIBE ...) TO is not supported"
169                                );
170                                return Ok(None);
171                            }
172                            // This is COPY TO (SELECT), continue
173                        }
174                        CopyDirection::From => {
175                            debug!(
176                                "Bailing out from try_frontend_peek, because COPY FROM is not supported"
177                            );
178                            return Ok(None);
179                        }
180                    }
181                }
182                _ => {
183                    debug!(
184                        "Bailing out from try_frontend_peek, because statement type is not supported"
185                    );
186                    return Ok(None);
187                }
188            }
189        }
190
191        // Set up statement logging, and log the beginning of execution.
192        // (But only if we're not executing in the context of another statement.)
193        let statement_logging_id = if outer_ctx_extra.is_none() {
194            // This is a new statement, so begin statement logging
195            let result = self.statement_logging_frontend.begin_statement_execution(
196                session,
197                &params,
198                &logging,
199                catalog.system_config(),
200                lifecycle_timestamps,
201            );
202
203            if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result {
204                self.log_began_execution(began_execution, mseh_update, prepared_statement);
205                Some(logging_id)
206            } else {
207                None
208            }
209        } else {
210            // We're executing in the context of another statement (e.g., FETCH),
211            // so extract the statement logging ID from the outer context if present.
212            // We take ownership and retire the outer context here. The end of execution will be
213            // logged in one of the following ways:
214            // - At the end of this function, if the execution is finished by then.
215            // - Later by the Coordinator, either due to RegisterFrontendPeek or ExecuteSlowPathPeek.
216            outer_ctx_extra
217                .take()
218                .and_then(|guard| guard.defuse().retire())
219        };
220
221        let result = self
222            .try_frontend_peek_inner(session, catalog, stmt, params, statement_logging_id)
223            .await;
224
225        // Log the end of execution if we are logging this statement and execution has already
226        // ended.
227        if let Some(logging_id) = statement_logging_id {
228            let reason = match &result {
229                // Streaming results are handled asynchronously by the coordinator
230                Ok(Some(ExecuteResponse::SendingRowsStreaming { .. })) => {
231                    // Don't log here - the peek is still executing.
232                    // It will be logged when handle_peek_notification is called.
233                    return result;
234                }
235                // COPY TO needs to check its inner response
236                Ok(Some(resp @ ExecuteResponse::CopyTo { resp: inner, .. })) => {
237                    match inner.as_ref() {
238                        ExecuteResponse::SendingRowsStreaming { .. } => {
239                            // Don't log here - the peek is still executing.
240                            // It will be logged when handle_peek_notification is called.
241                            return result;
242                        }
243                        // For non-streaming COPY TO responses, use the outer CopyTo for conversion
244                        _ => resp.into(),
245                    }
246                }
247                // Bailout case, which should not happen
248                Ok(None) => {
249                    soft_panic_or_log!(
250                        "Bailed out from `try_frontend_peek_inner` after we already logged the beginning of statement execution."
251                    );
252                    // This statement will be handled by the old peek sequencing, which will do its
253                    // own statement logging from the beginning. So, let's close out this one.
254                    self.log_ended_execution(
255                        logging_id,
256                        StatementEndedExecutionReason::Errored {
257                            error: "Internal error: bailed out from `try_frontend_peek_inner`"
258                                .to_string(),
259                        },
260                    );
261                    return result;
262                }
263                // All other success responses - use the From implementation
264                // TODO(peek-seq): After we delete the old peek sequencing, we'll be able to adjust
265                // the From implementation to do exactly what we need in the frontend peek
266                // sequencing, so that the above special cases won't be needed.
267                Ok(Some(resp)) => resp.into(),
268                Err(e) => StatementEndedExecutionReason::Errored {
269                    error: e.to_string(),
270                },
271            };
272
273            self.log_ended_execution(logging_id, reason);
274        }
275
276        result
277    }
278
279    /// This is encapsulated in an inner function so that the outer function can still do statement
280    /// logging after the `?` returns of the inner function.
281    async fn try_frontend_peek_inner(
282        &mut self,
283        session: &mut Session,
284        catalog: Arc<Catalog>,
285        stmt: Option<Arc<Statement<Raw>>>,
286        params: Params,
287        statement_logging_id: Option<crate::statement_logging::StatementLoggingId>,
288    ) -> Result<Option<ExecuteResponse>, AdapterError> {
289        let stmt = match stmt {
290            Some(stmt) => stmt,
291            None => {
292                debug!("try_frontend_peek_inner succeeded on an empty query");
293                return Ok(Some(ExecuteResponse::EmptyQuery));
294            }
295        };
296
297        session
298            .metrics()
299            .query_total(&[
300                metrics::session_type_label_value(session.user()),
301                metrics::statement_type_label_value(&stmt),
302            ])
303            .inc();
304
305        // # From handle_execute_inner
306
307        let conn_catalog = catalog.for_session(session);
308        // (`resolved_ids` should be derivable from `stmt`. If `stmt` is later transformed to
309        // remove/add IDs, then `resolved_ids` should be updated to also remove/add those IDs.)
310        let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?;
311
312        let pcx = session.pcx();
313        let plan = mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, &params, &resolved_ids)?;
314
315        let (select_plan, explain_ctx, copy_to_ctx) = match &plan {
316            Plan::Select(select_plan) => {
317                let explain_ctx = if session.vars().emit_plan_insights_notice() {
318                    let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
319                    ExplainContext::PlanInsightsNotice(optimizer_trace)
320                } else {
321                    ExplainContext::None
322                };
323                (select_plan, explain_ctx, None)
324            }
325            Plan::ShowColumns(show_columns_plan) => {
326                // ShowColumns wraps a SelectPlan, extract it and proceed as normal.
327                (&show_columns_plan.select_plan, ExplainContext::None, None)
328            }
329            Plan::ExplainPlan(plan::ExplainPlanPlan {
330                stage,
331                format,
332                config,
333                explainee: Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc }),
334            }) => {
335                // Create OptimizerTrace to collect optimizer plans
336                let optimizer_trace = OptimizerTrace::new(stage.paths());
337                let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
338                    broken: *broken,
339                    config: config.clone(),
340                    format: *format,
341                    stage: *stage,
342                    replan: None,
343                    desc: Some(desc.clone()),
344                    optimizer_trace,
345                });
346                (plan, explain_ctx, None)
347            }
348            // COPY TO S3
349            Plan::CopyTo(plan::CopyToPlan {
350                select_plan,
351                desc,
352                to,
353                connection,
354                connection_id,
355                format,
356                max_file_size,
357            }) => {
358                let uri = eval_copy_to_uri(to.clone(), session, catalog.state())?;
359
360                // (output_batch_count will be set later)
361                let copy_to_ctx = CopyToContext {
362                    desc: desc.clone(),
363                    uri,
364                    connection: connection.clone(),
365                    connection_id: *connection_id,
366                    format: format.clone(),
367                    max_file_size: *max_file_size,
368                    output_batch_count: None,
369                };
370
371                (select_plan, ExplainContext::None, Some(copy_to_ctx))
372            }
373            Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => {
374                // Only handle EXPLAIN FILTER PUSHDOWN for SELECT statements
375                match explainee {
376                    plan::Explainee::Statement(plan::ExplaineeStatement::Select {
377                        broken: false,
378                        plan,
379                        desc: _,
380                    }) => {
381                        let explain_ctx = ExplainContext::Pushdown;
382                        (plan, explain_ctx, None)
383                    }
384                    _ => {
385                        // This shouldn't happen because we already checked for this at the AST
386                        // level before calling `try_frontend_peek_inner`.
387                        soft_panic_or_log!(
388                            "unexpected EXPLAIN FILTER PUSHDOWN plan kind in frontend peek sequencing: {:?}",
389                            explainee
390                        );
391                        debug!(
392                            "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is EXPLAIN BROKEN"
393                        );
394                        return Ok(None);
395                    }
396                }
397            }
398            Plan::SideEffectingFunc(sef_plan) => {
399                // Side-effecting functions need Coordinator state (e.g., active_conns),
400                // so delegate to the Coordinator via a Command.
401                // The RBAC check is performed in the Coordinator where active_conns is available.
402                let response = self
403                    .call_coordinator(|tx| Command::ExecuteSideEffectingFunc {
404                        plan: sef_plan.clone(),
405                        conn_id: session.conn_id().clone(),
406                        current_role: session.role_metadata().current_role,
407                        tx,
408                    })
409                    .await?;
410                return Ok(Some(response));
411            }
412            _ => {
413                // This shouldn't happen because we already checked for this at the AST
414                // level before calling `try_frontend_peek_inner`.
415                soft_panic_or_log!(
416                    "Unexpected plan kind in frontend peek sequencing: {:?}",
417                    plan
418                );
419                debug!(
420                    "Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, side-effecting SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO S3"
421                );
422                return Ok(None);
423            }
424        };
425
426        // # From sequence_plan
427
428        // We have checked the plan kind above.
429        assert!(plan.allowed_in_read_only());
430
431        let target_cluster = match session.transaction().cluster() {
432            // Use the current transaction's cluster.
433            Some(cluster_id) => TargetCluster::Transaction(cluster_id),
434            // If there isn't a current cluster set for a transaction, then try to auto route.
435            None => {
436                coord::catalog_serving::auto_run_on_catalog_server(&conn_catalog, session, &plan)
437            }
438        };
439        let (cluster, target_cluster_id, target_cluster_name) = {
440            let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
441            (cluster, cluster.id, &cluster.name)
442        };
443
444        // Log cluster selection
445        if let Some(logging_id) = &statement_logging_id {
446            self.log_set_cluster(*logging_id, target_cluster_id);
447        }
448
449        coord::catalog_serving::check_cluster_restrictions(
450            target_cluster_name.as_str(),
451            &conn_catalog,
452            &plan,
453        )?;
454
455        rbac::check_plan(
456            &conn_catalog,
457            // We can't look at `active_conns` here, but that's ok, because this case was handled
458            // above already inside `Command::ExecuteSideEffectingFunc`.
459            None::<fn(u32) -> Option<RoleId>>,
460            session,
461            &plan,
462            Some(target_cluster_id),
463            &resolved_ids,
464        )?;
465
466        if let Some((_, wait_future)) =
467            coord::appends::waiting_on_startup_appends(&*catalog, session, &plan)
468        {
469            wait_future.await;
470        }
471
472        let max_query_result_size = Some(session.vars().max_query_result_size());
473
474        // # From sequence_peek
475
476        // # From peek_validate
477
478        let compute_instance_snapshot =
479            ComputeInstanceSnapshot::new_without_collections(cluster.id());
480
481        let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config())
482            .override_from(&catalog.get_cluster(cluster.id()).config.features())
483            .override_from(&explain_ctx);
484
485        if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
486            return Err(AdapterError::NoClusterReplicasAvailable {
487                name: cluster.name.clone(),
488                is_managed: cluster.is_managed(),
489            });
490        }
491
492        let (_, view_id) = self.transient_id_gen.allocate_id();
493        let (_, index_id) = self.transient_id_gen.allocate_id();
494
495        let mut optimizer = if let Some(mut copy_to_ctx) = copy_to_ctx {
496            // COPY TO path: calculate output_batch_count and create copy_to optimizer
497            let worker_counts = cluster.replicas().map(|r| {
498                let loc = &r.config.location;
499                loc.workers().unwrap_or_else(|| loc.num_processes())
500            });
501            let max_worker_count = match worker_counts.max() {
502                Some(count) => u64::cast_from(count),
503                None => {
504                    return Err(AdapterError::NoClusterReplicasAvailable {
505                        name: cluster.name.clone(),
506                        is_managed: cluster.is_managed(),
507                    });
508                }
509            };
510            copy_to_ctx.output_batch_count = Some(max_worker_count);
511
512            Either::Right(optimize::copy_to::Optimizer::new(
513                Arc::clone(&catalog),
514                compute_instance_snapshot.clone(),
515                view_id,
516                copy_to_ctx,
517                optimizer_config,
518                self.optimizer_metrics.clone(),
519            ))
520        } else {
521            // SELECT/EXPLAIN path: create peek optimizer
522            Either::Left(optimize::peek::Optimizer::new(
523                Arc::clone(&catalog),
524                compute_instance_snapshot.clone(),
525                select_plan.finishing.clone(),
526                view_id,
527                index_id,
528                optimizer_config,
529                self.optimizer_metrics.clone(),
530            ))
531        };
532
533        let target_replica_name = session.vars().cluster_replica();
534        let mut target_replica = target_replica_name
535            .map(|name| {
536                cluster
537                    .replica_id(name)
538                    .ok_or(AdapterError::UnknownClusterReplica {
539                        cluster_name: cluster.name.clone(),
540                        replica_name: name.to_string(),
541                    })
542            })
543            .transpose()?;
544
545        let source_ids = select_plan.source.depends_on();
546        // TODO(peek-seq): validate_timeline_context can be expensive in real scenarios (not in
547        // simple benchmarks), because it traverses transitive dependencies even of indexed views and
548        // materialized views (also traversing their MIR plans).
549        let mut timeline_context = catalog.validate_timeline_context(source_ids.iter().copied())?;
550        if matches!(timeline_context, TimelineContext::TimestampIndependent)
551            && select_plan.source.contains_temporal()?
552        {
553            // If the source IDs are timestamp independent but the query contains temporal functions,
554            // then the timeline context needs to be upgraded to timestamp dependent. This is
555            // required because `source_ids` doesn't contain functions.
556            timeline_context = TimelineContext::TimestampDependent;
557        }
558
559        let notices = coord::sequencer::check_log_reads(
560            &catalog,
561            cluster,
562            &source_ids,
563            &mut target_replica,
564            session.vars(),
565        )?;
566        session.add_notices(notices);
567
568        // # From peek_linearize_timestamp
569
570        let isolation_level = session.vars().transaction_isolation().clone();
571        let timeline = Coordinator::get_timeline(&timeline_context);
572        let needs_linearized_read_ts =
573            Coordinator::needs_linearized_read_ts(&isolation_level, &select_plan.when);
574
575        let oracle_read_ts = match timeline {
576            Some(timeline) if needs_linearized_read_ts => {
577                let oracle = self.ensure_oracle(timeline).await?;
578                let oracle_read_ts = oracle.read_ts().await;
579                Some(oracle_read_ts)
580            }
581            Some(_) | None => None,
582        };
583
584        // # From peek_real_time_recency
585
586        let vars = session.vars();
587        let real_time_recency_ts: Option<Timestamp> = if vars.real_time_recency()
588            && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
589            && !session.contains_read_timestamp()
590        {
591            // Only call the coordinator when we actually need real-time recency
592            self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp {
593                source_ids: source_ids.clone(),
594                real_time_recency_timeout: *vars.real_time_recency_timeout(),
595                tx,
596            })
597            .await?
598        } else {
599            None
600        };
601
602        // # From peek_timestamp_read_hold
603
604        let dataflow_builder = DataflowBuilder::new(catalog.state(), compute_instance_snapshot);
605        let input_id_bundle = dataflow_builder.sufficient_collections(source_ids.clone());
606
607        // ## From sequence_peek_timestamp
608
609        // Warning: This will be false for AS OF queries, even if we are otherwise inside a
610        // multi-statement transaction. (It's also false for FreshestTableWrite, which is currently
611        // only read-then-write queries, which can't be part of multi-statement transactions, so
612        // FreshestTableWrite doesn't matter.)
613        //
614        // TODO(peek-seq): It's not totally clear to me what the intended semantics are for AS OF
615        // queries inside a transaction: We clearly can't use the transaction timestamp, but the old
616        // peek sequencing still does a timedomain validation. The new peek sequencing does not do
617        // timedomain validation for AS OF queries, which seems more natural. But I'm thinking that
618        // it would be the cleanest to just simply disallow AS OF queries inside transactions.
619        let in_immediate_multi_stmt_txn = session
620            .transaction()
621            .in_immediate_multi_stmt_txn(&select_plan.when);
622
623        // Fetch or generate a timestamp for this query and fetch or acquire read holds.
624        let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
625            // Use the transaction's timestamp if it exists and this isn't an AS OF query.
626            // (`in_immediate_multi_stmt_txn` is false for AS OF queries.)
627            Some(
628                determination @ TimestampDetermination {
629                    timestamp_context: TimestampContext::TimelineTimestamp { .. },
630                    ..
631                },
632            ) if in_immediate_multi_stmt_txn => {
633                // This is a subsequent (non-AS OF, non-constant) query in a multi-statement
634                // transaction. We now:
635                // - Validate that the query only accesses collections within the transaction's
636                //   timedomain (which we know from the stored read holds).
637                // - Use the transaction's stored timestamp determination.
638                // - Use the (relevant subset of the) transaction's read holds.
639
640                let txn_read_holds_opt = self
641                    .call_coordinator(|tx| Command::GetTransactionReadHoldsBundle {
642                        conn_id: session.conn_id().clone(),
643                        tx,
644                    })
645                    .await;
646
647                if let Some(txn_read_holds) = txn_read_holds_opt {
648                    let allowed_id_bundle = txn_read_holds.id_bundle();
649                    let outside = input_id_bundle.difference(&allowed_id_bundle);
650
651                    // Queries without a timestamp and timeline can belong to any existing timedomain.
652                    if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
653                        let valid_names =
654                            allowed_id_bundle.resolve_names(&*catalog, session.conn_id());
655                        let invalid_names = outside.resolve_names(&*catalog, session.conn_id());
656                        return Err(AdapterError::RelationOutsideTimeDomain {
657                            relations: invalid_names,
658                            names: valid_names,
659                        });
660                    }
661
662                    // Extract the subset of read holds for the collections this query accesses.
663                    let read_holds = txn_read_holds.subset(&input_id_bundle);
664
665                    (determination, read_holds)
666                } else {
667                    // This should never happen: we're in a subsequent query of a multi-statement
668                    // transaction (we have a transaction timestamp), but the coordinator has no
669                    // transaction read holds stored. This indicates a bug in the transaction
670                    // handling.
671                    return Err(AdapterError::Internal(
672                        "Missing transaction read holds for multi-statement transaction"
673                            .to_string(),
674                    ));
675                }
676            }
677            _ => {
678                // There is no timestamp determination yet for this transaction. Either:
679                // - We are not in a multi-statement transaction.
680                // - This is the first (non-AS OF) query in a multi-statement transaction.
681                // - This is an AS OF query.
682                // - This is a constant query (`TimestampContext::NoTimestamp`).
683
684                let timedomain_bundle;
685                let determine_bundle = if in_immediate_multi_stmt_txn {
686                    // This is the first (non-AS OF) query in a multi-statement transaction.
687                    // Determine a timestamp that will be valid for anything in any schema
688                    // referenced by the first query.
689                    timedomain_bundle = timedomain_for(
690                        &*catalog,
691                        &dataflow_builder,
692                        &source_ids,
693                        &timeline_context,
694                        session.conn_id(),
695                        target_cluster_id,
696                    )?;
697                    &timedomain_bundle
698                } else {
699                    // Simply use the inputs of the current query.
700                    &input_id_bundle
701                };
702                let (determination, read_holds) = self
703                    .frontend_determine_timestamp(
704                        session,
705                        determine_bundle,
706                        &select_plan.when,
707                        target_cluster_id,
708                        &timeline_context,
709                        oracle_read_ts,
710                        real_time_recency_ts,
711                    )
712                    .await?;
713
714                // If this is the first (non-AS OF) query in a multi-statement transaction, store
715                // the read holds in the coordinator, so subsequent queries can validate against
716                // them.
717                if in_immediate_multi_stmt_txn {
718                    self.call_coordinator(|tx| Command::StoreTransactionReadHolds {
719                        conn_id: session.conn_id().clone(),
720                        read_holds: read_holds.clone(),
721                        tx,
722                    })
723                    .await;
724                }
725
726                (determination, read_holds)
727            }
728        };
729
730        {
731            // Assert that we have a read hold for all the collections in our `input_id_bundle`.
732            for id in input_id_bundle.iter() {
733                let s = read_holds.storage_holds.contains_key(&id);
734                let c = read_holds
735                    .compute_ids()
736                    .map(|(_instance, coll)| coll)
737                    .contains(&id);
738                soft_assert_or_log!(
739                    s || c,
740                    "missing read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
741                    id,
742                    in_immediate_multi_stmt_txn,
743                );
744            }
745
746            // Assert that each part of the `input_id_bundle` corresponds to the right part of
747            // `read_holds`.
748            for id in input_id_bundle.storage_ids.iter() {
749                soft_assert_or_log!(
750                    read_holds.storage_holds.contains_key(id),
751                    "missing storage read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
752                    id,
753                    in_immediate_multi_stmt_txn,
754                );
755            }
756            for id in input_id_bundle
757                .compute_ids
758                .iter()
759                .flat_map(|(_instance, colls)| colls)
760            {
761                soft_assert_or_log!(
762                    read_holds
763                        .compute_ids()
764                        .map(|(_instance, coll)| coll)
765                        .contains(id),
766                    "missing compute read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
767                    id,
768                    in_immediate_multi_stmt_txn,
769                );
770            }
771        }
772
773        // (TODO(peek-seq): The below TODO is copied from the old peek sequencing. We should resolve
774        // this when we decide what to with `AS OF` in transactions.)
775        // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
776        // arbitrary and we don't recall why we did it (possibly an error!). Change this to always
777        // set the transaction ops. Decide and document what our policy should be on AS OF queries.
778        // Maybe they shouldn't be allowed in transactions at all because it's hard to explain
779        // what's going on there. This should probably get a small design document.
780
781        // We only track the peeks in the session if the query doesn't use AS
782        // OF or we're inside an explicit transaction. The latter case is
783        // necessary to support PG's `BEGIN` semantics, whose behavior can
784        // depend on whether or not reads have occurred in the txn.
785        let requires_linearization = (&explain_ctx).into();
786        let mut transaction_determination = determination.clone();
787        if select_plan.when.is_transactional() {
788            session.add_transaction_ops(TransactionOps::Peeks {
789                determination: transaction_determination,
790                cluster_id: target_cluster_id,
791                requires_linearization,
792            })?;
793        } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
794            // If the query uses AS OF, then ignore the timestamp.
795            transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
796            session.add_transaction_ops(TransactionOps::Peeks {
797                determination: transaction_determination,
798                cluster_id: target_cluster_id,
799                requires_linearization,
800            })?;
801        };
802
803        // # From peek_optimize
804
805        let stats = statistics_oracle(
806            session,
807            &source_ids,
808            &determination.timestamp_context.antichain(),
809            true,
810            catalog.system_config(),
811            &*self.storage_collections,
812        )
813        .await
814        .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
815
816        // Generate data structures that can be moved to another task where we will perform possibly
817        // expensive optimizations.
818        let timestamp_context = determination.timestamp_context.clone();
819        let session_meta = session.meta();
820        let now = catalog.config().now.clone();
821        let select_plan = select_plan.clone();
822        let target_cluster_name = target_cluster_name.clone();
823        let needs_plan_insights = explain_ctx.needs_plan_insights();
824        let determination_for_pushdown = if matches!(explain_ctx, ExplainContext::Pushdown) {
825            // This is a hairy data structure, so avoid this clone if we are not in
826            // EXPLAIN FILTER PUSHDOWN.
827            Some(determination.clone())
828        } else {
829            None
830        };
831
832        let span = Span::current();
833
834        // Prepare data for plan insights if needed
835        let catalog_for_insights = if needs_plan_insights {
836            Some(Arc::clone(&catalog))
837        } else {
838            None
839        };
840        let mut compute_instances = BTreeMap::new();
841        if needs_plan_insights {
842            for user_cluster in catalog.user_clusters() {
843                let snapshot = ComputeInstanceSnapshot::new_without_collections(user_cluster.id);
844                compute_instances.insert(user_cluster.name.clone(), snapshot);
845            }
846        }
847
848        let source_ids_for_closure = source_ids.clone();
849        let optimization_future = mz_ore::task::spawn_blocking(
850            || "optimize peek",
851            move || {
852                span.in_scope(|| {
853                    let _dispatch_guard = explain_ctx.dispatch_guard();
854
855                    let raw_expr = select_plan.source.clone();
856
857                    // The purpose of wrapping the following in a closure is to control where the
858                    // `?`s return from, so that even when a `catch_unwind_optimize` call fails,
859                    // we can still handle `EXPLAIN BROKEN`.
860                    let pipeline = || -> Result<
861                        Either<
862                            optimize::peek::GlobalLirPlan,
863                            optimize::copy_to::GlobalLirPlan,
864                        >,
865                        OptimizerError,
866                    > {
867                        match optimizer.as_mut() {
868                            Either::Left(optimizer) => {
869                                // SELECT/EXPLAIN path
870                                // HIR ⇒ MIR lowering and MIR optimization (local)
871                                let local_mir_plan =
872                                    optimizer.catch_unwind_optimize(raw_expr.clone())?;
873                                // Attach resolved context required to continue the pipeline.
874                                let local_mir_plan = local_mir_plan.resolve(
875                                    timestamp_context.clone(),
876                                    &session_meta,
877                                    stats,
878                                );
879                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
880                                let global_lir_plan =
881                                    optimizer.catch_unwind_optimize(local_mir_plan)?;
882                                Ok(Either::Left(global_lir_plan))
883                            }
884                            Either::Right(optimizer) => {
885                                // COPY TO path
886                                // HIR ⇒ MIR lowering and MIR optimization (local)
887                                let local_mir_plan =
888                                    optimizer.catch_unwind_optimize(raw_expr.clone())?;
889                                // Attach resolved context required to continue the pipeline.
890                                let local_mir_plan = local_mir_plan.resolve(
891                                    timestamp_context.clone(),
892                                    &session_meta,
893                                    stats,
894                                );
895                                // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
896                                let global_lir_plan =
897                                    optimizer.catch_unwind_optimize(local_mir_plan)?;
898                                Ok(Either::Right(global_lir_plan))
899                            }
900                        }
901                    };
902
903                    let global_lir_plan_result = pipeline();
904                    let optimization_finished_at = now();
905
906                    let create_insights_ctx =
907                        |optimizer: &optimize::peek::Optimizer,
908                         is_notice: bool|
909                         -> Option<Box<PlanInsightsContext>> {
910                            if !needs_plan_insights {
911                                return None;
912                            }
913
914                            let catalog = catalog_for_insights.as_ref()?;
915
916                            let enable_re_optimize = if needs_plan_insights {
917                                // Disable any plan insights that use the optimizer if we only want the
918                                // notice and plan optimization took longer than the threshold. This is
919                                // to prevent a situation where optimizing takes a while and there are
920                                // lots of clusters, which would delay peek execution by the product of
921                                // those.
922                                //
923                                // (This heuristic doesn't work well, see #9492.)
924                                let dyncfgs = catalog.system_config().dyncfgs();
925                                let opt_limit = mz_adapter_types::dyncfgs
926                                ::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
927                                .get(dyncfgs);
928                                !(is_notice && optimizer.duration() > opt_limit)
929                            } else {
930                                false
931                            };
932
933                            Some(Box::new(PlanInsightsContext {
934                                stmt: select_plan
935                                    .select
936                                    .as_deref()
937                                    .map(Clone::clone)
938                                    .map(Statement::Select),
939                                raw_expr: raw_expr.clone(),
940                                catalog: Arc::clone(catalog),
941                                compute_instances,
942                                target_instance: target_cluster_name,
943                                metrics: optimizer.metrics().clone(),
944                                finishing: optimizer.finishing().clone(),
945                                optimizer_config: optimizer.config().clone(),
946                                session: session_meta,
947                                timestamp_context,
948                                view_id: optimizer.select_id(),
949                                index_id: optimizer.index_id(),
950                                enable_re_optimize,
951                            }))
952                        };
953
954                    match global_lir_plan_result {
955                        Ok(Either::Left(global_lir_plan)) => {
956                            // SELECT/EXPLAIN path
957                            let optimizer = optimizer.unwrap_left();
958                            match explain_ctx {
959                                ExplainContext::Plan(explain_ctx) => {
960                                    let (_, df_meta, _) = global_lir_plan.unapply();
961                                    let insights_ctx = create_insights_ctx(&optimizer, false);
962                                    Ok(Execution::ExplainPlan {
963                                        df_meta,
964                                        explain_ctx,
965                                        optimizer,
966                                        insights_ctx,
967                                    })
968                                }
969                                ExplainContext::None => Ok(Execution::Peek {
970                                    global_lir_plan,
971                                    optimization_finished_at,
972                                    plan_insights_optimizer_trace: None,
973                                    insights_ctx: None,
974                                }),
975                                ExplainContext::PlanInsightsNotice(optimizer_trace) => {
976                                    let insights_ctx = create_insights_ctx(&optimizer, true);
977                                    Ok(Execution::Peek {
978                                        global_lir_plan,
979                                        optimization_finished_at,
980                                        plan_insights_optimizer_trace: Some(optimizer_trace),
981                                        insights_ctx,
982                                    })
983                                }
984                                ExplainContext::Pushdown => {
985                                    let (plan, _, _) = global_lir_plan.unapply();
986                                    let imports = match plan {
987                                        PeekPlan::SlowPath(plan) => plan
988                                            .desc
989                                            .source_imports
990                                            .into_iter()
991                                            .filter_map(|(id, import)| {
992                                                import.desc.arguments.operators.map(|mfp| (id, mfp))
993                                            })
994                                            .collect(),
995                                        PeekPlan::FastPath(_) => {
996                                            std::collections::BTreeMap::default()
997                                        }
998                                    };
999                                    Ok(Execution::ExplainPushdown {
1000                                        imports,
1001                                        determination: determination_for_pushdown
1002                                            .expect("it's present for the ExplainPushdown case"),
1003                                    })
1004                                }
1005                            }
1006                        }
1007                        Ok(Either::Right(global_lir_plan)) => {
1008                            // COPY TO S3 path
1009                            Ok(Execution::CopyToS3 {
1010                                global_lir_plan,
1011                                source_ids: source_ids_for_closure,
1012                            })
1013                        }
1014                        Err(err) => {
1015                            if optimizer.is_right() {
1016                                // COPY TO has no EXPLAIN BROKEN support
1017                                return Err(err);
1018                            }
1019                            // SELECT/EXPLAIN error handling
1020                            let optimizer = optimizer.expect_left("checked above");
1021                            if let ExplainContext::Plan(explain_ctx) = explain_ctx {
1022                                if explain_ctx.broken {
1023                                    // EXPLAIN BROKEN: log error and continue with defaults
1024                                    tracing::error!(
1025                                        "error while handling EXPLAIN statement: {}",
1026                                        err
1027                                    );
1028                                    Ok(Execution::ExplainPlan {
1029                                        df_meta: Default::default(),
1030                                        explain_ctx,
1031                                        optimizer,
1032                                        insights_ctx: None,
1033                                    })
1034                                } else {
1035                                    Err(err)
1036                                }
1037                            } else {
1038                                Err(err)
1039                            }
1040                        }
1041                    }
1042                })
1043            },
1044        );
1045        let optimization_timeout = *session.vars().statement_timeout();
1046        let optimization_result =
1047            // Note: spawn_blocking tasks cannot be cancelled, so on timeout we stop waiting but the
1048            // optimization task continues running in the background until completion. See
1049            // https://github.com/MaterializeInc/database-issues/issues/8644 for properly cancelling
1050            // optimizer runs.
1051            match tokio::time::timeout(optimization_timeout, optimization_future).await {
1052                Ok(Ok(result)) => result,
1053                Ok(Err(optimizer_error)) => {
1054                    return Err(AdapterError::Internal(format!(
1055                        "internal error in optimizer: {}",
1056                        optimizer_error
1057                    )));
1058                }
1059                Err(_elapsed) => {
1060                    warn!("optimize peek timed out after {:?}", optimization_timeout);
1061                    return Err(AdapterError::StatementTimeout);
1062                }
1063            };
1064
1065        // Log optimization finished
1066        if let Some(logging_id) = &statement_logging_id {
1067            self.log_lifecycle_event(*logging_id, StatementLifecycleEvent::OptimizationFinished);
1068        }
1069
1070        // Assert that read holds are correct for the execution plan
1071        Self::assert_read_holds_correct(
1072            &read_holds,
1073            &optimization_result,
1074            &determination,
1075            target_cluster_id,
1076            in_immediate_multi_stmt_txn,
1077        );
1078
1079        // Handle the optimization result: either generate EXPLAIN output or continue with execution
1080        match optimization_result {
1081            Execution::ExplainPlan {
1082                df_meta,
1083                explain_ctx,
1084                optimizer,
1085                insights_ctx,
1086            } => {
1087                let rows = coord::sequencer::explain_plan_inner(
1088                    session,
1089                    &catalog,
1090                    df_meta,
1091                    explain_ctx,
1092                    optimizer,
1093                    insights_ctx,
1094                )
1095                .await?;
1096
1097                Ok(Some(ExecuteResponse::SendingRowsImmediate {
1098                    rows: Box::new(rows.into_row_iter()),
1099                }))
1100            }
1101            Execution::ExplainPushdown {
1102                imports,
1103                determination,
1104            } => {
1105                // # From peek_explain_pushdown
1106
1107                let as_of = determination.timestamp_context.antichain();
1108                let mz_now = determination
1109                    .timestamp_context
1110                    .timestamp()
1111                    .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1112                    .unwrap_or_else(ResultSpec::value_all);
1113
1114                Ok(Some(
1115                    coord::sequencer::explain_pushdown_future_inner(
1116                        session,
1117                        &*catalog,
1118                        &self.storage_collections,
1119                        as_of,
1120                        mz_now,
1121                        imports,
1122                    )
1123                    .await
1124                    .await?,
1125                ))
1126            }
1127            Execution::Peek {
1128                global_lir_plan,
1129                optimization_finished_at: _optimization_finished_at,
1130                plan_insights_optimizer_trace,
1131                insights_ctx,
1132            } => {
1133                // Continue with normal execution
1134                // # From peek_finish
1135
1136                // The typ here was generated from the HIR SQL type and simply stored in LIR.
1137                let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
1138
1139                coord::sequencer::emit_optimizer_notices(
1140                    &*catalog,
1141                    session,
1142                    &df_meta.optimizer_notices,
1143                );
1144
1145                // Generate plan insights notice if needed
1146                if let Some(trace) = plan_insights_optimizer_trace {
1147                    let target_cluster = catalog.get_cluster(target_cluster_id);
1148                    let features = OptimizerFeatures::from(catalog.system_config())
1149                        .override_from(&target_cluster.config.features());
1150                    let insights = trace
1151                        .into_plan_insights(
1152                            &features,
1153                            &catalog.for_session(session),
1154                            Some(select_plan.finishing.clone()),
1155                            Some(target_cluster),
1156                            df_meta.clone(),
1157                            insights_ctx,
1158                        )
1159                        .await?;
1160                    session.add_notice(AdapterNotice::PlanInsights(insights));
1161                }
1162
1163                // # Now back to peek_finish
1164
1165                let watch_set = statement_logging_id.map(|logging_id| {
1166                    WatchSetCreation::new(
1167                        logging_id,
1168                        catalog.state(),
1169                        &input_id_bundle,
1170                        determination.timestamp_context.timestamp_or_default(),
1171                    )
1172                });
1173
1174                let max_result_size = catalog.system_config().max_result_size();
1175
1176                // Clone determination if we need it for emit_timestamp_notice, since it may be
1177                // moved into Command::ExecuteSlowPathPeek.
1178                let determination_for_notice = if session.vars().emit_timestamp_notice() {
1179                    Some(determination.clone())
1180                } else {
1181                    None
1182                };
1183
1184                let response = match peek_plan {
1185                    PeekPlan::FastPath(fast_path_plan) => {
1186                        if let Some(logging_id) = &statement_logging_id {
1187                            // TODO(peek-seq): Actually, we should log it also for
1188                            // FastPathPlan::Constant. The only reason we are not doing so at the
1189                            // moment is to match the old peek sequencing, so that statement logging
1190                            // tests pass with the frontend peek sequencing turned both on and off.
1191                            //
1192                            // When the old sequencing is removed, we should make a couple of
1193                            // changes in how we log timestamps:
1194                            // - Move this up to just after timestamp determination, so that it
1195                            //   appears in the log as soon as possible.
1196                            // - Do it also for Constant peeks.
1197                            // - Currently, slow-path peeks' timestamp logging is done by
1198                            //   `implement_peek_plan`. We could remove it from there, and just do
1199                            //   it here.
1200                            if !matches!(fast_path_plan, FastPathPlan::Constant(..)) {
1201                                self.log_set_timestamp(
1202                                    *logging_id,
1203                                    determination.timestamp_context.timestamp_or_default(),
1204                                );
1205                            }
1206                        }
1207
1208                        let row_set_finishing_seconds =
1209                            session.metrics().row_set_finishing_seconds().clone();
1210
1211                        let peek_stash_read_batch_size_bytes =
1212                            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
1213                                .get(catalog.system_config().dyncfgs());
1214                        let peek_stash_read_memory_budget_bytes =
1215                            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
1216                                .get(catalog.system_config().dyncfgs());
1217
1218                        self.implement_fast_path_peek_plan(
1219                            fast_path_plan,
1220                            determination.timestamp_context.timestamp_or_default(),
1221                            select_plan.finishing,
1222                            target_cluster_id,
1223                            target_replica,
1224                            typ,
1225                            max_result_size,
1226                            max_query_result_size,
1227                            row_set_finishing_seconds,
1228                            read_holds,
1229                            peek_stash_read_batch_size_bytes,
1230                            peek_stash_read_memory_budget_bytes,
1231                            session.conn_id().clone(),
1232                            source_ids,
1233                            watch_set,
1234                        )
1235                        .await?
1236                    }
1237                    PeekPlan::SlowPath(dataflow_plan) => {
1238                        if let Some(logging_id) = &statement_logging_id {
1239                            self.log_set_transient_index_id(*logging_id, dataflow_plan.id);
1240                        }
1241
1242                        self.call_coordinator(|tx| Command::ExecuteSlowPathPeek {
1243                            dataflow_plan: Box::new(dataflow_plan),
1244                            determination,
1245                            finishing: select_plan.finishing,
1246                            compute_instance: target_cluster_id,
1247                            target_replica,
1248                            intermediate_result_type: typ,
1249                            source_ids,
1250                            conn_id: session.conn_id().clone(),
1251                            max_result_size,
1252                            max_query_result_size,
1253                            watch_set,
1254                            tx,
1255                        })
1256                        .await?
1257                    }
1258                };
1259
1260                // Add timestamp notice if emit_timestamp_notice is enabled
1261                if let Some(determination) = determination_for_notice {
1262                    let explanation = self
1263                        .call_coordinator(|tx| Command::ExplainTimestamp {
1264                            conn_id: session.conn_id().clone(),
1265                            session_wall_time: session.pcx().wall_time,
1266                            cluster_id: target_cluster_id,
1267                            id_bundle: input_id_bundle.clone(),
1268                            determination,
1269                            tx,
1270                        })
1271                        .await;
1272                    session.add_notice(AdapterNotice::QueryTimestamp { explanation });
1273                }
1274
1275                Ok(Some(match select_plan.copy_to {
1276                    None => response,
1277                    // COPY TO STDOUT
1278                    Some(format) => ExecuteResponse::CopyTo {
1279                        format,
1280                        resp: Box::new(response),
1281                    },
1282                }))
1283            }
1284            Execution::CopyToS3 {
1285                global_lir_plan,
1286                source_ids,
1287            } => {
1288                let (df_desc, df_meta) = global_lir_plan.unapply();
1289
1290                coord::sequencer::emit_optimizer_notices(
1291                    &*catalog,
1292                    session,
1293                    &df_meta.optimizer_notices,
1294                );
1295
1296                // Extract S3 sink connection info for preflight check
1297                let sink_id = df_desc.sink_id();
1298                let sinks = &df_desc.sink_exports;
1299                if sinks.len() != 1 {
1300                    return Err(AdapterError::Internal(
1301                        "expected exactly one copy to s3 sink".into(),
1302                    ));
1303                }
1304                let (_, sink_desc) = sinks
1305                    .first_key_value()
1306                    .expect("known to be exactly one copy to s3 sink");
1307                let s3_sink_connection = match &sink_desc.connection {
1308                    mz_compute_types::sinks::ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1309                        conn.clone()
1310                    }
1311                    _ => {
1312                        return Err(AdapterError::Internal(
1313                            "expected copy to s3 oneshot sink".into(),
1314                        ));
1315                    }
1316                };
1317
1318                // Perform S3 preflight check in background task (via coordinator).
1319                // This runs slow S3 operations without blocking the coordinator's main task.
1320                self.call_coordinator(|tx| Command::CopyToPreflight {
1321                    s3_sink_connection,
1322                    sink_id,
1323                    tx,
1324                })
1325                .await?;
1326
1327                // Preflight succeeded, now execute the actual COPY TO dataflow
1328                let watch_set = statement_logging_id.map(|logging_id| {
1329                    WatchSetCreation::new(
1330                        logging_id,
1331                        catalog.state(),
1332                        &input_id_bundle,
1333                        determination.timestamp_context.timestamp_or_default(),
1334                    )
1335                });
1336
1337                let response = self
1338                    .call_coordinator(|tx| Command::ExecuteCopyTo {
1339                        df_desc: Box::new(df_desc),
1340                        compute_instance: target_cluster_id,
1341                        target_replica,
1342                        source_ids,
1343                        conn_id: session.conn_id().clone(),
1344                        watch_set,
1345                        tx,
1346                    })
1347                    .await?;
1348
1349                Ok(Some(response))
1350            }
1351        }
1352    }
1353
1354    /// (Similar to Coordinator::determine_timestamp)
1355    /// Determines the timestamp for a query, acquires read holds that ensure the
1356    /// query remains executable at that time, and returns those.
1357    /// The caller is responsible for eventually dropping those read holds.
1358    ///
1359    /// Note: self is taken &mut because of the lazy fetching in `get_compute_instance_client`.
1360    pub(crate) async fn frontend_determine_timestamp(
1361        &mut self,
1362        session: &Session,
1363        id_bundle: &CollectionIdBundle,
1364        when: &QueryWhen,
1365        compute_instance: ComputeInstanceId,
1366        timeline_context: &TimelineContext,
1367        oracle_read_ts: Option<Timestamp>,
1368        real_time_recency_ts: Option<Timestamp>,
1369    ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
1370        // this is copy-pasted from Coordinator
1371
1372        let isolation_level = session.vars().transaction_isolation();
1373
1374        let (read_holds, upper) = self
1375            .acquire_read_holds_and_least_valid_write(id_bundle)
1376            .await
1377            .map_err(|err| {
1378                AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1379                    err,
1380                    compute_instance,
1381                )
1382            })?;
1383        let (det, read_holds) = <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1384            session,
1385            id_bundle,
1386            when,
1387            timeline_context,
1388            oracle_read_ts,
1389            real_time_recency_ts,
1390            isolation_level,
1391            read_holds,
1392            upper.clone(),
1393        )?;
1394
1395        session
1396            .metrics()
1397            .determine_timestamp(&[
1398                match det.respond_immediately() {
1399                    true => "true",
1400                    false => "false",
1401                },
1402                isolation_level.as_str(),
1403                &compute_instance.to_string(),
1404            ])
1405            .inc();
1406        if !det.respond_immediately()
1407            && isolation_level == &IsolationLevel::StrictSerializable
1408            && real_time_recency_ts.is_none()
1409        {
1410            // Note down the difference between StrictSerializable and Serializable into a metric.
1411            if let Some(strict) = det.timestamp_context.timestamp() {
1412                let (serializable_det, _tmp_read_holds) =
1413                    <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1414                        session,
1415                        id_bundle,
1416                        when,
1417                        timeline_context,
1418                        oracle_read_ts,
1419                        real_time_recency_ts,
1420                        &IsolationLevel::Serializable,
1421                        read_holds.clone(),
1422                        upper,
1423                    )?;
1424                if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
1425                    session
1426                        .metrics()
1427                        .timestamp_difference_for_strict_serializable_ms(&[compute_instance
1428                            .to_string()
1429                            .as_ref()])
1430                        .observe(f64::cast_lossy(u64::from(
1431                            strict.saturating_sub(*serializable),
1432                        )));
1433                }
1434            }
1435        }
1436
1437        Ok((det, read_holds))
1438    }
1439
1440    fn assert_read_holds_correct(
1441        read_holds: &ReadHolds<Timestamp>,
1442        execution: &Execution,
1443        determination: &TimestampDetermination<Timestamp>,
1444        target_cluster_id: ClusterId,
1445        in_immediate_multi_stmt_txn: bool,
1446    ) {
1447        // Extract source_imports, index_imports, as_of, and execution_name based on Execution variant
1448        let (source_imports, index_imports, as_of, execution_name): (
1449            Vec<GlobalId>,
1450            Vec<GlobalId>,
1451            Timestamp,
1452            &str,
1453        ) = match execution {
1454            Execution::Peek {
1455                global_lir_plan, ..
1456            } => match global_lir_plan.peek_plan() {
1457                PeekPlan::FastPath(fast_path_plan) => {
1458                    let (sources, indexes) = match fast_path_plan {
1459                        FastPathPlan::Constant(..) => (vec![], vec![]),
1460                        FastPathPlan::PeekExisting(_coll_id, idx_id, ..) => (vec![], vec![*idx_id]),
1461                        FastPathPlan::PeekPersist(global_id, ..) => (vec![*global_id], vec![]),
1462                    };
1463                    (
1464                        sources,
1465                        indexes,
1466                        determination.timestamp_context.timestamp_or_default(),
1467                        "FastPath",
1468                    )
1469                }
1470                PeekPlan::SlowPath(dataflow_plan) => {
1471                    let as_of = dataflow_plan
1472                        .desc
1473                        .as_of
1474                        .clone()
1475                        .expect("dataflow has an as_of")
1476                        .into_element();
1477                    (
1478                        dataflow_plan.desc.source_imports.keys().cloned().collect(),
1479                        dataflow_plan.desc.index_imports.keys().cloned().collect(),
1480                        as_of,
1481                        "SlowPath",
1482                    )
1483                }
1484            },
1485            Execution::CopyToS3 {
1486                global_lir_plan, ..
1487            } => {
1488                let df_desc = global_lir_plan.df_desc();
1489                let as_of = df_desc
1490                    .as_of
1491                    .clone()
1492                    .expect("dataflow has an as_of")
1493                    .into_element();
1494                (
1495                    df_desc.source_imports.keys().cloned().collect(),
1496                    df_desc.index_imports.keys().cloned().collect(),
1497                    as_of,
1498                    "CopyToS3",
1499                )
1500            }
1501            Execution::ExplainPlan { .. } | Execution::ExplainPushdown { .. } => {
1502                // No read holds assertions needed for EXPLAIN variants
1503                return;
1504            }
1505        };
1506
1507        // Assert that we have some read holds for all the imports of the dataflow.
1508        for id in source_imports.iter() {
1509            soft_assert_or_log!(
1510                read_holds.storage_holds.contains_key(id),
1511                "[{}] missing read hold for the source import {}; (in_immediate_multi_stmt_txn: {})",
1512                execution_name,
1513                id,
1514                in_immediate_multi_stmt_txn,
1515            );
1516        }
1517        for id in index_imports.iter() {
1518            soft_assert_or_log!(
1519                read_holds
1520                    .compute_ids()
1521                    .map(|(_instance, coll)| coll)
1522                    .contains(id),
1523                "[{}] missing read hold for the index import {}; (in_immediate_multi_stmt_txn: {})",
1524                execution_name,
1525                id,
1526                in_immediate_multi_stmt_txn,
1527            );
1528        }
1529
1530        // Also check the holds against the as_of.
1531        for (id, h) in read_holds.storage_holds.iter() {
1532            soft_assert_or_log!(
1533                h.since().less_equal(&as_of),
1534                "[{}] storage read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1535                execution_name,
1536                h.since(),
1537                id,
1538                as_of,
1539                determination,
1540                in_immediate_multi_stmt_txn,
1541            );
1542        }
1543        for ((instance, id), h) in read_holds.compute_holds.iter() {
1544            soft_assert_eq_or_log!(
1545                *instance,
1546                target_cluster_id,
1547                "[{}] the read hold on {} is on the wrong cluster; (in_immediate_multi_stmt_txn: {})",
1548                execution_name,
1549                id,
1550                in_immediate_multi_stmt_txn,
1551            );
1552            soft_assert_or_log!(
1553                h.since().less_equal(&as_of),
1554                "[{}] compute read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1555                execution_name,
1556                h.since(),
1557                id,
1558                as_of,
1559                determination,
1560                in_immediate_multi_stmt_txn,
1561            );
1562        }
1563    }
1564}
1565
1566/// Enum for branching among various execution steps after optimization
1567enum Execution {
1568    Peek {
1569        global_lir_plan: optimize::peek::GlobalLirPlan,
1570        optimization_finished_at: EpochMillis,
1571        plan_insights_optimizer_trace: Option<OptimizerTrace>,
1572        insights_ctx: Option<Box<PlanInsightsContext>>,
1573    },
1574    CopyToS3 {
1575        global_lir_plan: optimize::copy_to::GlobalLirPlan,
1576        source_ids: BTreeSet<GlobalId>,
1577    },
1578    ExplainPlan {
1579        df_meta: DataflowMetainfo,
1580        explain_ctx: ExplainPlanContext,
1581        optimizer: optimize::peek::Optimizer,
1582        insights_ctx: Option<Box<PlanInsightsContext>>,
1583    },
1584    ExplainPushdown {
1585        imports: BTreeMap<GlobalId, mz_expr::MapFilterProject>,
1586        determination: TimestampDetermination<Timestamp>,
1587    },
1588}