mz_adapter/frontend_peek.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::sync::Arc;
13
14use itertools::{Either, Itertools};
15use mz_compute_types::ComputeInstanceId;
16use mz_controller_types::ClusterId;
17use mz_expr::{CollectionPlan, ResultSpec};
18use mz_ore::cast::{CastFrom, CastLossy};
19use mz_ore::collections::CollectionExt;
20use mz_ore::now::EpochMillis;
21use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
22use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
23use mz_repr::role_id::RoleId;
24use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
25use mz_sql::ast::Raw;
26use mz_sql::catalog::CatalogCluster;
27use mz_sql::plan::Params;
28use mz_sql::plan::{self, Explainee, ExplaineeStatement, Plan, QueryWhen};
29use mz_sql::rbac;
30use mz_sql::session::metadata::SessionMetadata;
31use mz_sql::session::vars::IsolationLevel;
32use mz_sql_parser::ast::{CopyDirection, CopyRelation, ExplainStage, ShowStatement, Statement};
33use mz_transform::EmptyStatisticsOracle;
34use mz_transform::dataflow::DataflowMetainfo;
35use opentelemetry::trace::TraceContextExt;
36use tracing::{Span, debug, warn};
37use tracing_opentelemetry::OpenTelemetrySpanExt;
38
39use crate::catalog::Catalog;
40use crate::command::Command;
41use crate::coord::peek::{FastPathPlan, PeekPlan};
42use crate::coord::sequencer::{eval_copy_to_uri, statistics_oracle};
43use crate::coord::timeline::timedomain_for;
44use crate::coord::timestamp_selection::TimestampDetermination;
45use crate::coord::{
46 Coordinator, CopyToContext, ExecuteContextGuard, ExplainContext, ExplainPlanContext,
47 TargetCluster,
48};
49use crate::explain::insights::PlanInsightsContext;
50use crate::explain::optimizer_trace::OptimizerTrace;
51use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
52use crate::optimize::{Optimize, OptimizerError};
53use crate::session::{Session, TransactionOps, TransactionStatus};
54use crate::statement_logging::WatchSetCreation;
55use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
56use crate::{
57 AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, PeekClient, ReadHolds,
58 TimelineContext, TimestampContext, TimestampProvider, optimize,
59};
60use crate::{coord, metrics};
61
62impl PeekClient {
63 /// Attempt to sequence a peek from the session task.
64 ///
65 /// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the
66 /// Coordinator's sequencing. If it returns an error, it should be returned to the user.
67 ///
68 /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
69 /// triggering the execution of the underlying query.
70 pub(crate) async fn try_frontend_peek(
71 &mut self,
72 portal_name: &str,
73 session: &mut Session,
74 outer_ctx_extra: &mut Option<ExecuteContextGuard>,
75 ) -> Result<Option<ExecuteResponse>, AdapterError> {
76 // # From handle_execute
77
78 if session.vars().emit_trace_id_notice() {
79 let span_context = tracing::Span::current()
80 .context()
81 .span()
82 .span_context()
83 .clone();
84 if span_context.is_valid() {
85 session.add_notice(AdapterNotice::QueryTrace {
86 trace_id: span_context.trace_id(),
87 });
88 }
89 }
90
91 // TODO(peek-seq): This snapshot is wasted when we end up bailing out from the frontend peek
92 // sequencing. We could solve this is with that optimization where we
93 // continuously keep a catalog snapshot in the session, and only get a new one when the
94 // catalog revision has changed, which we could see with an atomic read.
95 // But anyhow, this problem will just go away when we reach the point that we never fall
96 // back to the old sequencing.
97 let catalog = self.catalog_snapshot("try_frontend_peek").await;
98
99 // Extract things from the portal.
100 let (stmt, params, logging, lifecycle_timestamps) = {
101 if let Err(err) = Coordinator::verify_portal(&*catalog, session, portal_name) {
102 outer_ctx_extra
103 .take()
104 .and_then(|guard| guard.defuse().retire());
105 return Err(err);
106 }
107 let portal = session
108 .get_portal_unverified(portal_name)
109 // The portal is a session-level thing, so it couldn't have concurrently disappeared
110 // since the above verification.
111 .expect("called verify_portal above");
112 let params = portal.parameters.clone();
113 let stmt = portal.stmt.clone();
114 let logging = Arc::clone(&portal.logging);
115 let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
116 (stmt, params, logging, lifecycle_timestamps)
117 };
118
119 // Before planning, check if this is a statement type we can handle.
120 // This must happen BEFORE statement logging setup to avoid orphaned execution records.
121 if let Some(ref stmt) = stmt {
122 match &**stmt {
123 Statement::Select(_)
124 | Statement::ExplainAnalyzeObject(_)
125 | Statement::ExplainAnalyzeCluster(_)
126 | Statement::Show(ShowStatement::ShowObjects(_))
127 | Statement::Show(ShowStatement::ShowColumns(_)) => {
128 // These are always fine, just continue.
129 // Note: EXPLAIN ANALYZE will `plan` to `Plan::Select`.
130 // Note: ShowObjects plans to `Plan::Select`, ShowColumns plans to `Plan::ShowColumns`.
131 // We handle `Plan::ShowColumns` specially in `try_frontend_peek_inner`.
132 }
133 Statement::ExplainPlan(explain_stmt) => {
134 // Only handle ExplainPlan for SELECT statements.
135 // We don't want to handle e.g. EXPLAIN CREATE MATERIALIZED VIEW here, because that
136 // requires purification before planning, which the frontend peek sequencing doesn't
137 // do.
138 match &explain_stmt.explainee {
139 mz_sql_parser::ast::Explainee::Select(..) => {
140 // This is a SELECT, continue
141 }
142 _ => {
143 debug!(
144 "Bailing out from try_frontend_peek, because EXPLAIN is not for a SELECT query"
145 );
146 return Ok(None);
147 }
148 }
149 }
150 Statement::ExplainPushdown(explain_stmt) => {
151 // Only handle EXPLAIN FILTER PUSHDOWN for non-BROKEN SELECT statements
152 match &explain_stmt.explainee {
153 mz_sql_parser::ast::Explainee::Select(_, false) => {}
154 _ => {
155 debug!(
156 "Bailing out from try_frontend_peek, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is for EXPLAIN BROKEN"
157 );
158 return Ok(None);
159 }
160 }
161 }
162 Statement::Copy(copy_stmt) => {
163 match ©_stmt.direction {
164 CopyDirection::To => {
165 // Check for SUBSCRIBE inside COPY TO - we don't handle Plan::Subscribe
166 if matches!(©_stmt.relation, CopyRelation::Subscribe(_)) {
167 debug!(
168 "Bailing out from try_frontend_peek, because COPY (SUBSCRIBE ...) TO is not supported"
169 );
170 return Ok(None);
171 }
172 // This is COPY TO (SELECT), continue
173 }
174 CopyDirection::From => {
175 debug!(
176 "Bailing out from try_frontend_peek, because COPY FROM is not supported"
177 );
178 return Ok(None);
179 }
180 }
181 }
182 _ => {
183 debug!(
184 "Bailing out from try_frontend_peek, because statement type is not supported"
185 );
186 return Ok(None);
187 }
188 }
189 }
190
191 // Set up statement logging, and log the beginning of execution.
192 // (But only if we're not executing in the context of another statement.)
193 let statement_logging_id = if outer_ctx_extra.is_none() {
194 // This is a new statement, so begin statement logging
195 let result = self.statement_logging_frontend.begin_statement_execution(
196 session,
197 ¶ms,
198 &logging,
199 catalog.system_config(),
200 lifecycle_timestamps,
201 );
202
203 if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result {
204 self.log_began_execution(began_execution, mseh_update, prepared_statement);
205 Some(logging_id)
206 } else {
207 None
208 }
209 } else {
210 // We're executing in the context of another statement (e.g., FETCH),
211 // so extract the statement logging ID from the outer context if present.
212 // We take ownership and retire the outer context here. The end of execution will be
213 // logged in one of the following ways:
214 // - At the end of this function, if the execution is finished by then.
215 // - Later by the Coordinator, either due to RegisterFrontendPeek or ExecuteSlowPathPeek.
216 outer_ctx_extra
217 .take()
218 .and_then(|guard| guard.defuse().retire())
219 };
220
221 let result = self
222 .try_frontend_peek_inner(session, catalog, stmt, params, statement_logging_id)
223 .await;
224
225 // Log the end of execution if we are logging this statement and execution has already
226 // ended.
227 if let Some(logging_id) = statement_logging_id {
228 let reason = match &result {
229 // Streaming results are handled asynchronously by the coordinator
230 Ok(Some(ExecuteResponse::SendingRowsStreaming { .. })) => {
231 // Don't log here - the peek is still executing.
232 // It will be logged when handle_peek_notification is called.
233 return result;
234 }
235 // COPY TO needs to check its inner response
236 Ok(Some(resp @ ExecuteResponse::CopyTo { resp: inner, .. })) => {
237 match inner.as_ref() {
238 ExecuteResponse::SendingRowsStreaming { .. } => {
239 // Don't log here - the peek is still executing.
240 // It will be logged when handle_peek_notification is called.
241 return result;
242 }
243 // For non-streaming COPY TO responses, use the outer CopyTo for conversion
244 _ => resp.into(),
245 }
246 }
247 // Bailout case, which should not happen
248 Ok(None) => {
249 soft_panic_or_log!(
250 "Bailed out from `try_frontend_peek_inner` after we already logged the beginning of statement execution."
251 );
252 // This statement will be handled by the old peek sequencing, which will do its
253 // own statement logging from the beginning. So, let's close out this one.
254 self.log_ended_execution(
255 logging_id,
256 StatementEndedExecutionReason::Errored {
257 error: "Internal error: bailed out from `try_frontend_peek_inner`"
258 .to_string(),
259 },
260 );
261 return result;
262 }
263 // All other success responses - use the From implementation
264 // TODO(peek-seq): After we delete the old peek sequencing, we'll be able to adjust
265 // the From implementation to do exactly what we need in the frontend peek
266 // sequencing, so that the above special cases won't be needed.
267 Ok(Some(resp)) => resp.into(),
268 Err(e) => StatementEndedExecutionReason::Errored {
269 error: e.to_string(),
270 },
271 };
272
273 self.log_ended_execution(logging_id, reason);
274 }
275
276 result
277 }
278
279 /// This is encapsulated in an inner function so that the outer function can still do statement
280 /// logging after the `?` returns of the inner function.
281 async fn try_frontend_peek_inner(
282 &mut self,
283 session: &mut Session,
284 catalog: Arc<Catalog>,
285 stmt: Option<Arc<Statement<Raw>>>,
286 params: Params,
287 statement_logging_id: Option<crate::statement_logging::StatementLoggingId>,
288 ) -> Result<Option<ExecuteResponse>, AdapterError> {
289 let stmt = match stmt {
290 Some(stmt) => stmt,
291 None => {
292 debug!("try_frontend_peek_inner succeeded on an empty query");
293 return Ok(Some(ExecuteResponse::EmptyQuery));
294 }
295 };
296
297 session
298 .metrics()
299 .query_total(&[
300 metrics::session_type_label_value(session.user()),
301 metrics::statement_type_label_value(&stmt),
302 ])
303 .inc();
304
305 // # From handle_execute_inner
306
307 let conn_catalog = catalog.for_session(session);
308 // (`resolved_ids` should be derivable from `stmt`. If `stmt` is later transformed to
309 // remove/add IDs, then `resolved_ids` should be updated to also remove/add those IDs.)
310 let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?;
311
312 let pcx = session.pcx();
313 let plan = mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, ¶ms, &resolved_ids)?;
314
315 let (select_plan, explain_ctx, copy_to_ctx) = match &plan {
316 Plan::Select(select_plan) => {
317 let explain_ctx = if session.vars().emit_plan_insights_notice() {
318 let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
319 ExplainContext::PlanInsightsNotice(optimizer_trace)
320 } else {
321 ExplainContext::None
322 };
323 (select_plan, explain_ctx, None)
324 }
325 Plan::ShowColumns(show_columns_plan) => {
326 // ShowColumns wraps a SelectPlan, extract it and proceed as normal.
327 (&show_columns_plan.select_plan, ExplainContext::None, None)
328 }
329 Plan::ExplainPlan(plan::ExplainPlanPlan {
330 stage,
331 format,
332 config,
333 explainee: Explainee::Statement(ExplaineeStatement::Select { broken, plan, desc }),
334 }) => {
335 // Create OptimizerTrace to collect optimizer plans
336 let optimizer_trace = OptimizerTrace::new(stage.paths());
337 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
338 broken: *broken,
339 config: config.clone(),
340 format: *format,
341 stage: *stage,
342 replan: None,
343 desc: Some(desc.clone()),
344 optimizer_trace,
345 });
346 (plan, explain_ctx, None)
347 }
348 // COPY TO S3
349 Plan::CopyTo(plan::CopyToPlan {
350 select_plan,
351 desc,
352 to,
353 connection,
354 connection_id,
355 format,
356 max_file_size,
357 }) => {
358 let uri = eval_copy_to_uri(to.clone(), session, catalog.state())?;
359
360 // (output_batch_count will be set later)
361 let copy_to_ctx = CopyToContext {
362 desc: desc.clone(),
363 uri,
364 connection: connection.clone(),
365 connection_id: *connection_id,
366 format: format.clone(),
367 max_file_size: *max_file_size,
368 output_batch_count: None,
369 };
370
371 (select_plan, ExplainContext::None, Some(copy_to_ctx))
372 }
373 Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => {
374 // Only handle EXPLAIN FILTER PUSHDOWN for SELECT statements
375 match explainee {
376 plan::Explainee::Statement(plan::ExplaineeStatement::Select {
377 broken: false,
378 plan,
379 desc: _,
380 }) => {
381 let explain_ctx = ExplainContext::Pushdown;
382 (plan, explain_ctx, None)
383 }
384 _ => {
385 // This shouldn't happen because we already checked for this at the AST
386 // level before calling `try_frontend_peek_inner`.
387 soft_panic_or_log!(
388 "unexpected EXPLAIN FILTER PUSHDOWN plan kind in frontend peek sequencing: {:?}",
389 explainee
390 );
391 debug!(
392 "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is EXPLAIN BROKEN"
393 );
394 return Ok(None);
395 }
396 }
397 }
398 Plan::SideEffectingFunc(sef_plan) => {
399 // Side-effecting functions need Coordinator state (e.g., active_conns),
400 // so delegate to the Coordinator via a Command.
401 // The RBAC check is performed in the Coordinator where active_conns is available.
402 let response = self
403 .call_coordinator(|tx| Command::ExecuteSideEffectingFunc {
404 plan: sef_plan.clone(),
405 conn_id: session.conn_id().clone(),
406 current_role: session.role_metadata().current_role,
407 tx,
408 })
409 .await?;
410 return Ok(Some(response));
411 }
412 _ => {
413 // This shouldn't happen because we already checked for this at the AST
414 // level before calling `try_frontend_peek_inner`.
415 soft_panic_or_log!(
416 "Unexpected plan kind in frontend peek sequencing: {:?}",
417 plan
418 );
419 debug!(
420 "Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, side-effecting SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO S3"
421 );
422 return Ok(None);
423 }
424 };
425
426 // # From sequence_plan
427
428 // We have checked the plan kind above.
429 assert!(plan.allowed_in_read_only());
430
431 let target_cluster = match session.transaction().cluster() {
432 // Use the current transaction's cluster.
433 Some(cluster_id) => TargetCluster::Transaction(cluster_id),
434 // If there isn't a current cluster set for a transaction, then try to auto route.
435 None => {
436 coord::catalog_serving::auto_run_on_catalog_server(&conn_catalog, session, &plan)
437 }
438 };
439 let (cluster, target_cluster_id, target_cluster_name) = {
440 let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
441 (cluster, cluster.id, &cluster.name)
442 };
443
444 // Log cluster selection
445 if let Some(logging_id) = &statement_logging_id {
446 self.log_set_cluster(*logging_id, target_cluster_id);
447 }
448
449 coord::catalog_serving::check_cluster_restrictions(
450 target_cluster_name.as_str(),
451 &conn_catalog,
452 &plan,
453 )?;
454
455 rbac::check_plan(
456 &conn_catalog,
457 // We can't look at `active_conns` here, but that's ok, because this case was handled
458 // above already inside `Command::ExecuteSideEffectingFunc`.
459 None::<fn(u32) -> Option<RoleId>>,
460 session,
461 &plan,
462 Some(target_cluster_id),
463 &resolved_ids,
464 )?;
465
466 if let Some((_, wait_future)) =
467 coord::appends::waiting_on_startup_appends(&*catalog, session, &plan)
468 {
469 wait_future.await;
470 }
471
472 let max_query_result_size = Some(session.vars().max_query_result_size());
473
474 // # From sequence_peek
475
476 // # From peek_validate
477
478 let compute_instance_snapshot =
479 ComputeInstanceSnapshot::new_without_collections(cluster.id());
480
481 let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config())
482 .override_from(&catalog.get_cluster(cluster.id()).config.features())
483 .override_from(&explain_ctx);
484
485 if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
486 return Err(AdapterError::NoClusterReplicasAvailable {
487 name: cluster.name.clone(),
488 is_managed: cluster.is_managed(),
489 });
490 }
491
492 let (_, view_id) = self.transient_id_gen.allocate_id();
493 let (_, index_id) = self.transient_id_gen.allocate_id();
494
495 let mut optimizer = if let Some(mut copy_to_ctx) = copy_to_ctx {
496 // COPY TO path: calculate output_batch_count and create copy_to optimizer
497 let worker_counts = cluster.replicas().map(|r| {
498 let loc = &r.config.location;
499 loc.workers().unwrap_or_else(|| loc.num_processes())
500 });
501 let max_worker_count = match worker_counts.max() {
502 Some(count) => u64::cast_from(count),
503 None => {
504 return Err(AdapterError::NoClusterReplicasAvailable {
505 name: cluster.name.clone(),
506 is_managed: cluster.is_managed(),
507 });
508 }
509 };
510 copy_to_ctx.output_batch_count = Some(max_worker_count);
511
512 Either::Right(optimize::copy_to::Optimizer::new(
513 Arc::clone(&catalog),
514 compute_instance_snapshot.clone(),
515 view_id,
516 copy_to_ctx,
517 optimizer_config,
518 self.optimizer_metrics.clone(),
519 ))
520 } else {
521 // SELECT/EXPLAIN path: create peek optimizer
522 Either::Left(optimize::peek::Optimizer::new(
523 Arc::clone(&catalog),
524 compute_instance_snapshot.clone(),
525 select_plan.finishing.clone(),
526 view_id,
527 index_id,
528 optimizer_config,
529 self.optimizer_metrics.clone(),
530 ))
531 };
532
533 let target_replica_name = session.vars().cluster_replica();
534 let mut target_replica = target_replica_name
535 .map(|name| {
536 cluster
537 .replica_id(name)
538 .ok_or(AdapterError::UnknownClusterReplica {
539 cluster_name: cluster.name.clone(),
540 replica_name: name.to_string(),
541 })
542 })
543 .transpose()?;
544
545 let source_ids = select_plan.source.depends_on();
546 // TODO(peek-seq): validate_timeline_context can be expensive in real scenarios (not in
547 // simple benchmarks), because it traverses transitive dependencies even of indexed views and
548 // materialized views (also traversing their MIR plans).
549 let mut timeline_context = catalog.validate_timeline_context(source_ids.iter().copied())?;
550 if matches!(timeline_context, TimelineContext::TimestampIndependent)
551 && select_plan.source.contains_temporal()?
552 {
553 // If the source IDs are timestamp independent but the query contains temporal functions,
554 // then the timeline context needs to be upgraded to timestamp dependent. This is
555 // required because `source_ids` doesn't contain functions.
556 timeline_context = TimelineContext::TimestampDependent;
557 }
558
559 let notices = coord::sequencer::check_log_reads(
560 &catalog,
561 cluster,
562 &source_ids,
563 &mut target_replica,
564 session.vars(),
565 )?;
566 session.add_notices(notices);
567
568 // # From peek_linearize_timestamp
569
570 let isolation_level = session.vars().transaction_isolation().clone();
571 let timeline = Coordinator::get_timeline(&timeline_context);
572 let needs_linearized_read_ts =
573 Coordinator::needs_linearized_read_ts(&isolation_level, &select_plan.when);
574
575 let oracle_read_ts = match timeline {
576 Some(timeline) if needs_linearized_read_ts => {
577 let oracle = self.ensure_oracle(timeline).await?;
578 let oracle_read_ts = oracle.read_ts().await;
579 Some(oracle_read_ts)
580 }
581 Some(_) | None => None,
582 };
583
584 // # From peek_real_time_recency
585
586 let vars = session.vars();
587 let real_time_recency_ts: Option<Timestamp> = if vars.real_time_recency()
588 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
589 && !session.contains_read_timestamp()
590 {
591 // Only call the coordinator when we actually need real-time recency
592 self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp {
593 source_ids: source_ids.clone(),
594 real_time_recency_timeout: *vars.real_time_recency_timeout(),
595 tx,
596 })
597 .await?
598 } else {
599 None
600 };
601
602 // # From peek_timestamp_read_hold
603
604 let dataflow_builder = DataflowBuilder::new(catalog.state(), compute_instance_snapshot);
605 let input_id_bundle = dataflow_builder.sufficient_collections(source_ids.clone());
606
607 // ## From sequence_peek_timestamp
608
609 // Warning: This will be false for AS OF queries, even if we are otherwise inside a
610 // multi-statement transaction. (It's also false for FreshestTableWrite, which is currently
611 // only read-then-write queries, which can't be part of multi-statement transactions, so
612 // FreshestTableWrite doesn't matter.)
613 //
614 // TODO(peek-seq): It's not totally clear to me what the intended semantics are for AS OF
615 // queries inside a transaction: We clearly can't use the transaction timestamp, but the old
616 // peek sequencing still does a timedomain validation. The new peek sequencing does not do
617 // timedomain validation for AS OF queries, which seems more natural. But I'm thinking that
618 // it would be the cleanest to just simply disallow AS OF queries inside transactions.
619 let in_immediate_multi_stmt_txn = session
620 .transaction()
621 .in_immediate_multi_stmt_txn(&select_plan.when);
622
623 // Fetch or generate a timestamp for this query and fetch or acquire read holds.
624 let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
625 // Use the transaction's timestamp if it exists and this isn't an AS OF query.
626 // (`in_immediate_multi_stmt_txn` is false for AS OF queries.)
627 Some(
628 determination @ TimestampDetermination {
629 timestamp_context: TimestampContext::TimelineTimestamp { .. },
630 ..
631 },
632 ) if in_immediate_multi_stmt_txn => {
633 // This is a subsequent (non-AS OF, non-constant) query in a multi-statement
634 // transaction. We now:
635 // - Validate that the query only accesses collections within the transaction's
636 // timedomain (which we know from the stored read holds).
637 // - Use the transaction's stored timestamp determination.
638 // - Use the (relevant subset of the) transaction's read holds.
639
640 let txn_read_holds_opt = self
641 .call_coordinator(|tx| Command::GetTransactionReadHoldsBundle {
642 conn_id: session.conn_id().clone(),
643 tx,
644 })
645 .await;
646
647 if let Some(txn_read_holds) = txn_read_holds_opt {
648 let allowed_id_bundle = txn_read_holds.id_bundle();
649 let outside = input_id_bundle.difference(&allowed_id_bundle);
650
651 // Queries without a timestamp and timeline can belong to any existing timedomain.
652 if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
653 let valid_names =
654 allowed_id_bundle.resolve_names(&*catalog, session.conn_id());
655 let invalid_names = outside.resolve_names(&*catalog, session.conn_id());
656 return Err(AdapterError::RelationOutsideTimeDomain {
657 relations: invalid_names,
658 names: valid_names,
659 });
660 }
661
662 // Extract the subset of read holds for the collections this query accesses.
663 let read_holds = txn_read_holds.subset(&input_id_bundle);
664
665 (determination, read_holds)
666 } else {
667 // This should never happen: we're in a subsequent query of a multi-statement
668 // transaction (we have a transaction timestamp), but the coordinator has no
669 // transaction read holds stored. This indicates a bug in the transaction
670 // handling.
671 return Err(AdapterError::Internal(
672 "Missing transaction read holds for multi-statement transaction"
673 .to_string(),
674 ));
675 }
676 }
677 _ => {
678 // There is no timestamp determination yet for this transaction. Either:
679 // - We are not in a multi-statement transaction.
680 // - This is the first (non-AS OF) query in a multi-statement transaction.
681 // - This is an AS OF query.
682 // - This is a constant query (`TimestampContext::NoTimestamp`).
683
684 let timedomain_bundle;
685 let determine_bundle = if in_immediate_multi_stmt_txn {
686 // This is the first (non-AS OF) query in a multi-statement transaction.
687 // Determine a timestamp that will be valid for anything in any schema
688 // referenced by the first query.
689 timedomain_bundle = timedomain_for(
690 &*catalog,
691 &dataflow_builder,
692 &source_ids,
693 &timeline_context,
694 session.conn_id(),
695 target_cluster_id,
696 )?;
697 &timedomain_bundle
698 } else {
699 // Simply use the inputs of the current query.
700 &input_id_bundle
701 };
702 let (determination, read_holds) = self
703 .frontend_determine_timestamp(
704 session,
705 determine_bundle,
706 &select_plan.when,
707 target_cluster_id,
708 &timeline_context,
709 oracle_read_ts,
710 real_time_recency_ts,
711 )
712 .await?;
713
714 // If this is the first (non-AS OF) query in a multi-statement transaction, store
715 // the read holds in the coordinator, so subsequent queries can validate against
716 // them.
717 if in_immediate_multi_stmt_txn {
718 self.call_coordinator(|tx| Command::StoreTransactionReadHolds {
719 conn_id: session.conn_id().clone(),
720 read_holds: read_holds.clone(),
721 tx,
722 })
723 .await;
724 }
725
726 (determination, read_holds)
727 }
728 };
729
730 {
731 // Assert that we have a read hold for all the collections in our `input_id_bundle`.
732 for id in input_id_bundle.iter() {
733 let s = read_holds.storage_holds.contains_key(&id);
734 let c = read_holds
735 .compute_ids()
736 .map(|(_instance, coll)| coll)
737 .contains(&id);
738 soft_assert_or_log!(
739 s || c,
740 "missing read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
741 id,
742 in_immediate_multi_stmt_txn,
743 );
744 }
745
746 // Assert that each part of the `input_id_bundle` corresponds to the right part of
747 // `read_holds`.
748 for id in input_id_bundle.storage_ids.iter() {
749 soft_assert_or_log!(
750 read_holds.storage_holds.contains_key(id),
751 "missing storage read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
752 id,
753 in_immediate_multi_stmt_txn,
754 );
755 }
756 for id in input_id_bundle
757 .compute_ids
758 .iter()
759 .flat_map(|(_instance, colls)| colls)
760 {
761 soft_assert_or_log!(
762 read_holds
763 .compute_ids()
764 .map(|(_instance, coll)| coll)
765 .contains(id),
766 "missing compute read hold for collection {} in `input_id_bundle`; (in_immediate_multi_stmt_txn: {})",
767 id,
768 in_immediate_multi_stmt_txn,
769 );
770 }
771 }
772
773 // (TODO(peek-seq): The below TODO is copied from the old peek sequencing. We should resolve
774 // this when we decide what to with `AS OF` in transactions.)
775 // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
776 // arbitrary and we don't recall why we did it (possibly an error!). Change this to always
777 // set the transaction ops. Decide and document what our policy should be on AS OF queries.
778 // Maybe they shouldn't be allowed in transactions at all because it's hard to explain
779 // what's going on there. This should probably get a small design document.
780
781 // We only track the peeks in the session if the query doesn't use AS
782 // OF or we're inside an explicit transaction. The latter case is
783 // necessary to support PG's `BEGIN` semantics, whose behavior can
784 // depend on whether or not reads have occurred in the txn.
785 let requires_linearization = (&explain_ctx).into();
786 let mut transaction_determination = determination.clone();
787 if select_plan.when.is_transactional() {
788 session.add_transaction_ops(TransactionOps::Peeks {
789 determination: transaction_determination,
790 cluster_id: target_cluster_id,
791 requires_linearization,
792 })?;
793 } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
794 // If the query uses AS OF, then ignore the timestamp.
795 transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
796 session.add_transaction_ops(TransactionOps::Peeks {
797 determination: transaction_determination,
798 cluster_id: target_cluster_id,
799 requires_linearization,
800 })?;
801 };
802
803 // # From peek_optimize
804
805 let stats = statistics_oracle(
806 session,
807 &source_ids,
808 &determination.timestamp_context.antichain(),
809 true,
810 catalog.system_config(),
811 &*self.storage_collections,
812 )
813 .await
814 .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
815
816 // Generate data structures that can be moved to another task where we will perform possibly
817 // expensive optimizations.
818 let timestamp_context = determination.timestamp_context.clone();
819 let session_meta = session.meta();
820 let now = catalog.config().now.clone();
821 let select_plan = select_plan.clone();
822 let target_cluster_name = target_cluster_name.clone();
823 let needs_plan_insights = explain_ctx.needs_plan_insights();
824 let determination_for_pushdown = if matches!(explain_ctx, ExplainContext::Pushdown) {
825 // This is a hairy data structure, so avoid this clone if we are not in
826 // EXPLAIN FILTER PUSHDOWN.
827 Some(determination.clone())
828 } else {
829 None
830 };
831
832 let span = Span::current();
833
834 // Prepare data for plan insights if needed
835 let catalog_for_insights = if needs_plan_insights {
836 Some(Arc::clone(&catalog))
837 } else {
838 None
839 };
840 let mut compute_instances = BTreeMap::new();
841 if needs_plan_insights {
842 for user_cluster in catalog.user_clusters() {
843 let snapshot = ComputeInstanceSnapshot::new_without_collections(user_cluster.id);
844 compute_instances.insert(user_cluster.name.clone(), snapshot);
845 }
846 }
847
848 let source_ids_for_closure = source_ids.clone();
849 let optimization_future = mz_ore::task::spawn_blocking(
850 || "optimize peek",
851 move || {
852 span.in_scope(|| {
853 let _dispatch_guard = explain_ctx.dispatch_guard();
854
855 let raw_expr = select_plan.source.clone();
856
857 // The purpose of wrapping the following in a closure is to control where the
858 // `?`s return from, so that even when a `catch_unwind_optimize` call fails,
859 // we can still handle `EXPLAIN BROKEN`.
860 let pipeline = || -> Result<
861 Either<
862 optimize::peek::GlobalLirPlan,
863 optimize::copy_to::GlobalLirPlan,
864 >,
865 OptimizerError,
866 > {
867 match optimizer.as_mut() {
868 Either::Left(optimizer) => {
869 // SELECT/EXPLAIN path
870 // HIR ⇒ MIR lowering and MIR optimization (local)
871 let local_mir_plan =
872 optimizer.catch_unwind_optimize(raw_expr.clone())?;
873 // Attach resolved context required to continue the pipeline.
874 let local_mir_plan = local_mir_plan.resolve(
875 timestamp_context.clone(),
876 &session_meta,
877 stats,
878 );
879 // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
880 let global_lir_plan =
881 optimizer.catch_unwind_optimize(local_mir_plan)?;
882 Ok(Either::Left(global_lir_plan))
883 }
884 Either::Right(optimizer) => {
885 // COPY TO path
886 // HIR ⇒ MIR lowering and MIR optimization (local)
887 let local_mir_plan =
888 optimizer.catch_unwind_optimize(raw_expr.clone())?;
889 // Attach resolved context required to continue the pipeline.
890 let local_mir_plan = local_mir_plan.resolve(
891 timestamp_context.clone(),
892 &session_meta,
893 stats,
894 );
895 // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
896 let global_lir_plan =
897 optimizer.catch_unwind_optimize(local_mir_plan)?;
898 Ok(Either::Right(global_lir_plan))
899 }
900 }
901 };
902
903 let global_lir_plan_result = pipeline();
904 let optimization_finished_at = now();
905
906 let create_insights_ctx =
907 |optimizer: &optimize::peek::Optimizer,
908 is_notice: bool|
909 -> Option<Box<PlanInsightsContext>> {
910 if !needs_plan_insights {
911 return None;
912 }
913
914 let catalog = catalog_for_insights.as_ref()?;
915
916 let enable_re_optimize = if needs_plan_insights {
917 // Disable any plan insights that use the optimizer if we only want the
918 // notice and plan optimization took longer than the threshold. This is
919 // to prevent a situation where optimizing takes a while and there are
920 // lots of clusters, which would delay peek execution by the product of
921 // those.
922 //
923 // (This heuristic doesn't work well, see #9492.)
924 let dyncfgs = catalog.system_config().dyncfgs();
925 let opt_limit = mz_adapter_types::dyncfgs
926 ::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
927 .get(dyncfgs);
928 !(is_notice && optimizer.duration() > opt_limit)
929 } else {
930 false
931 };
932
933 Some(Box::new(PlanInsightsContext {
934 stmt: select_plan
935 .select
936 .as_deref()
937 .map(Clone::clone)
938 .map(Statement::Select),
939 raw_expr: raw_expr.clone(),
940 catalog: Arc::clone(catalog),
941 compute_instances,
942 target_instance: target_cluster_name,
943 metrics: optimizer.metrics().clone(),
944 finishing: optimizer.finishing().clone(),
945 optimizer_config: optimizer.config().clone(),
946 session: session_meta,
947 timestamp_context,
948 view_id: optimizer.select_id(),
949 index_id: optimizer.index_id(),
950 enable_re_optimize,
951 }))
952 };
953
954 match global_lir_plan_result {
955 Ok(Either::Left(global_lir_plan)) => {
956 // SELECT/EXPLAIN path
957 let optimizer = optimizer.unwrap_left();
958 match explain_ctx {
959 ExplainContext::Plan(explain_ctx) => {
960 let (_, df_meta, _) = global_lir_plan.unapply();
961 let insights_ctx = create_insights_ctx(&optimizer, false);
962 Ok(Execution::ExplainPlan {
963 df_meta,
964 explain_ctx,
965 optimizer,
966 insights_ctx,
967 })
968 }
969 ExplainContext::None => Ok(Execution::Peek {
970 global_lir_plan,
971 optimization_finished_at,
972 plan_insights_optimizer_trace: None,
973 insights_ctx: None,
974 }),
975 ExplainContext::PlanInsightsNotice(optimizer_trace) => {
976 let insights_ctx = create_insights_ctx(&optimizer, true);
977 Ok(Execution::Peek {
978 global_lir_plan,
979 optimization_finished_at,
980 plan_insights_optimizer_trace: Some(optimizer_trace),
981 insights_ctx,
982 })
983 }
984 ExplainContext::Pushdown => {
985 let (plan, _, _) = global_lir_plan.unapply();
986 let imports = match plan {
987 PeekPlan::SlowPath(plan) => plan
988 .desc
989 .source_imports
990 .into_iter()
991 .filter_map(|(id, import)| {
992 import.desc.arguments.operators.map(|mfp| (id, mfp))
993 })
994 .collect(),
995 PeekPlan::FastPath(_) => {
996 std::collections::BTreeMap::default()
997 }
998 };
999 Ok(Execution::ExplainPushdown {
1000 imports,
1001 determination: determination_for_pushdown
1002 .expect("it's present for the ExplainPushdown case"),
1003 })
1004 }
1005 }
1006 }
1007 Ok(Either::Right(global_lir_plan)) => {
1008 // COPY TO S3 path
1009 Ok(Execution::CopyToS3 {
1010 global_lir_plan,
1011 source_ids: source_ids_for_closure,
1012 })
1013 }
1014 Err(err) => {
1015 if optimizer.is_right() {
1016 // COPY TO has no EXPLAIN BROKEN support
1017 return Err(err);
1018 }
1019 // SELECT/EXPLAIN error handling
1020 let optimizer = optimizer.expect_left("checked above");
1021 if let ExplainContext::Plan(explain_ctx) = explain_ctx {
1022 if explain_ctx.broken {
1023 // EXPLAIN BROKEN: log error and continue with defaults
1024 tracing::error!(
1025 "error while handling EXPLAIN statement: {}",
1026 err
1027 );
1028 Ok(Execution::ExplainPlan {
1029 df_meta: Default::default(),
1030 explain_ctx,
1031 optimizer,
1032 insights_ctx: None,
1033 })
1034 } else {
1035 Err(err)
1036 }
1037 } else {
1038 Err(err)
1039 }
1040 }
1041 }
1042 })
1043 },
1044 );
1045 let optimization_timeout = *session.vars().statement_timeout();
1046 let optimization_result =
1047 // Note: spawn_blocking tasks cannot be cancelled, so on timeout we stop waiting but the
1048 // optimization task continues running in the background until completion. See
1049 // https://github.com/MaterializeInc/database-issues/issues/8644 for properly cancelling
1050 // optimizer runs.
1051 match tokio::time::timeout(optimization_timeout, optimization_future).await {
1052 Ok(Ok(result)) => result,
1053 Ok(Err(optimizer_error)) => {
1054 return Err(AdapterError::Internal(format!(
1055 "internal error in optimizer: {}",
1056 optimizer_error
1057 )));
1058 }
1059 Err(_elapsed) => {
1060 warn!("optimize peek timed out after {:?}", optimization_timeout);
1061 return Err(AdapterError::StatementTimeout);
1062 }
1063 };
1064
1065 // Log optimization finished
1066 if let Some(logging_id) = &statement_logging_id {
1067 self.log_lifecycle_event(*logging_id, StatementLifecycleEvent::OptimizationFinished);
1068 }
1069
1070 // Assert that read holds are correct for the execution plan
1071 Self::assert_read_holds_correct(
1072 &read_holds,
1073 &optimization_result,
1074 &determination,
1075 target_cluster_id,
1076 in_immediate_multi_stmt_txn,
1077 );
1078
1079 // Handle the optimization result: either generate EXPLAIN output or continue with execution
1080 match optimization_result {
1081 Execution::ExplainPlan {
1082 df_meta,
1083 explain_ctx,
1084 optimizer,
1085 insights_ctx,
1086 } => {
1087 let rows = coord::sequencer::explain_plan_inner(
1088 session,
1089 &catalog,
1090 df_meta,
1091 explain_ctx,
1092 optimizer,
1093 insights_ctx,
1094 )
1095 .await?;
1096
1097 Ok(Some(ExecuteResponse::SendingRowsImmediate {
1098 rows: Box::new(rows.into_row_iter()),
1099 }))
1100 }
1101 Execution::ExplainPushdown {
1102 imports,
1103 determination,
1104 } => {
1105 // # From peek_explain_pushdown
1106
1107 let as_of = determination.timestamp_context.antichain();
1108 let mz_now = determination
1109 .timestamp_context
1110 .timestamp()
1111 .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1112 .unwrap_or_else(ResultSpec::value_all);
1113
1114 Ok(Some(
1115 coord::sequencer::explain_pushdown_future_inner(
1116 session,
1117 &*catalog,
1118 &self.storage_collections,
1119 as_of,
1120 mz_now,
1121 imports,
1122 )
1123 .await
1124 .await?,
1125 ))
1126 }
1127 Execution::Peek {
1128 global_lir_plan,
1129 optimization_finished_at: _optimization_finished_at,
1130 plan_insights_optimizer_trace,
1131 insights_ctx,
1132 } => {
1133 // Continue with normal execution
1134 // # From peek_finish
1135
1136 // The typ here was generated from the HIR SQL type and simply stored in LIR.
1137 let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
1138
1139 coord::sequencer::emit_optimizer_notices(
1140 &*catalog,
1141 session,
1142 &df_meta.optimizer_notices,
1143 );
1144
1145 // Generate plan insights notice if needed
1146 if let Some(trace) = plan_insights_optimizer_trace {
1147 let target_cluster = catalog.get_cluster(target_cluster_id);
1148 let features = OptimizerFeatures::from(catalog.system_config())
1149 .override_from(&target_cluster.config.features());
1150 let insights = trace
1151 .into_plan_insights(
1152 &features,
1153 &catalog.for_session(session),
1154 Some(select_plan.finishing.clone()),
1155 Some(target_cluster),
1156 df_meta.clone(),
1157 insights_ctx,
1158 )
1159 .await?;
1160 session.add_notice(AdapterNotice::PlanInsights(insights));
1161 }
1162
1163 // # Now back to peek_finish
1164
1165 let watch_set = statement_logging_id.map(|logging_id| {
1166 WatchSetCreation::new(
1167 logging_id,
1168 catalog.state(),
1169 &input_id_bundle,
1170 determination.timestamp_context.timestamp_or_default(),
1171 )
1172 });
1173
1174 let max_result_size = catalog.system_config().max_result_size();
1175
1176 // Clone determination if we need it for emit_timestamp_notice, since it may be
1177 // moved into Command::ExecuteSlowPathPeek.
1178 let determination_for_notice = if session.vars().emit_timestamp_notice() {
1179 Some(determination.clone())
1180 } else {
1181 None
1182 };
1183
1184 let response = match peek_plan {
1185 PeekPlan::FastPath(fast_path_plan) => {
1186 if let Some(logging_id) = &statement_logging_id {
1187 // TODO(peek-seq): Actually, we should log it also for
1188 // FastPathPlan::Constant. The only reason we are not doing so at the
1189 // moment is to match the old peek sequencing, so that statement logging
1190 // tests pass with the frontend peek sequencing turned both on and off.
1191 //
1192 // When the old sequencing is removed, we should make a couple of
1193 // changes in how we log timestamps:
1194 // - Move this up to just after timestamp determination, so that it
1195 // appears in the log as soon as possible.
1196 // - Do it also for Constant peeks.
1197 // - Currently, slow-path peeks' timestamp logging is done by
1198 // `implement_peek_plan`. We could remove it from there, and just do
1199 // it here.
1200 if !matches!(fast_path_plan, FastPathPlan::Constant(..)) {
1201 self.log_set_timestamp(
1202 *logging_id,
1203 determination.timestamp_context.timestamp_or_default(),
1204 );
1205 }
1206 }
1207
1208 let row_set_finishing_seconds =
1209 session.metrics().row_set_finishing_seconds().clone();
1210
1211 let peek_stash_read_batch_size_bytes =
1212 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
1213 .get(catalog.system_config().dyncfgs());
1214 let peek_stash_read_memory_budget_bytes =
1215 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
1216 .get(catalog.system_config().dyncfgs());
1217
1218 self.implement_fast_path_peek_plan(
1219 fast_path_plan,
1220 determination.timestamp_context.timestamp_or_default(),
1221 select_plan.finishing,
1222 target_cluster_id,
1223 target_replica,
1224 typ,
1225 max_result_size,
1226 max_query_result_size,
1227 row_set_finishing_seconds,
1228 read_holds,
1229 peek_stash_read_batch_size_bytes,
1230 peek_stash_read_memory_budget_bytes,
1231 session.conn_id().clone(),
1232 source_ids,
1233 watch_set,
1234 )
1235 .await?
1236 }
1237 PeekPlan::SlowPath(dataflow_plan) => {
1238 if let Some(logging_id) = &statement_logging_id {
1239 self.log_set_transient_index_id(*logging_id, dataflow_plan.id);
1240 }
1241
1242 self.call_coordinator(|tx| Command::ExecuteSlowPathPeek {
1243 dataflow_plan: Box::new(dataflow_plan),
1244 determination,
1245 finishing: select_plan.finishing,
1246 compute_instance: target_cluster_id,
1247 target_replica,
1248 intermediate_result_type: typ,
1249 source_ids,
1250 conn_id: session.conn_id().clone(),
1251 max_result_size,
1252 max_query_result_size,
1253 watch_set,
1254 tx,
1255 })
1256 .await?
1257 }
1258 };
1259
1260 // Add timestamp notice if emit_timestamp_notice is enabled
1261 if let Some(determination) = determination_for_notice {
1262 let explanation = self
1263 .call_coordinator(|tx| Command::ExplainTimestamp {
1264 conn_id: session.conn_id().clone(),
1265 session_wall_time: session.pcx().wall_time,
1266 cluster_id: target_cluster_id,
1267 id_bundle: input_id_bundle.clone(),
1268 determination,
1269 tx,
1270 })
1271 .await;
1272 session.add_notice(AdapterNotice::QueryTimestamp { explanation });
1273 }
1274
1275 Ok(Some(match select_plan.copy_to {
1276 None => response,
1277 // COPY TO STDOUT
1278 Some(format) => ExecuteResponse::CopyTo {
1279 format,
1280 resp: Box::new(response),
1281 },
1282 }))
1283 }
1284 Execution::CopyToS3 {
1285 global_lir_plan,
1286 source_ids,
1287 } => {
1288 let (df_desc, df_meta) = global_lir_plan.unapply();
1289
1290 coord::sequencer::emit_optimizer_notices(
1291 &*catalog,
1292 session,
1293 &df_meta.optimizer_notices,
1294 );
1295
1296 // Extract S3 sink connection info for preflight check
1297 let sink_id = df_desc.sink_id();
1298 let sinks = &df_desc.sink_exports;
1299 if sinks.len() != 1 {
1300 return Err(AdapterError::Internal(
1301 "expected exactly one copy to s3 sink".into(),
1302 ));
1303 }
1304 let (_, sink_desc) = sinks
1305 .first_key_value()
1306 .expect("known to be exactly one copy to s3 sink");
1307 let s3_sink_connection = match &sink_desc.connection {
1308 mz_compute_types::sinks::ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1309 conn.clone()
1310 }
1311 _ => {
1312 return Err(AdapterError::Internal(
1313 "expected copy to s3 oneshot sink".into(),
1314 ));
1315 }
1316 };
1317
1318 // Perform S3 preflight check in background task (via coordinator).
1319 // This runs slow S3 operations without blocking the coordinator's main task.
1320 self.call_coordinator(|tx| Command::CopyToPreflight {
1321 s3_sink_connection,
1322 sink_id,
1323 tx,
1324 })
1325 .await?;
1326
1327 // Preflight succeeded, now execute the actual COPY TO dataflow
1328 let watch_set = statement_logging_id.map(|logging_id| {
1329 WatchSetCreation::new(
1330 logging_id,
1331 catalog.state(),
1332 &input_id_bundle,
1333 determination.timestamp_context.timestamp_or_default(),
1334 )
1335 });
1336
1337 let response = self
1338 .call_coordinator(|tx| Command::ExecuteCopyTo {
1339 df_desc: Box::new(df_desc),
1340 compute_instance: target_cluster_id,
1341 target_replica,
1342 source_ids,
1343 conn_id: session.conn_id().clone(),
1344 watch_set,
1345 tx,
1346 })
1347 .await?;
1348
1349 Ok(Some(response))
1350 }
1351 }
1352 }
1353
1354 /// (Similar to Coordinator::determine_timestamp)
1355 /// Determines the timestamp for a query, acquires read holds that ensure the
1356 /// query remains executable at that time, and returns those.
1357 /// The caller is responsible for eventually dropping those read holds.
1358 ///
1359 /// Note: self is taken &mut because of the lazy fetching in `get_compute_instance_client`.
1360 pub(crate) async fn frontend_determine_timestamp(
1361 &mut self,
1362 session: &Session,
1363 id_bundle: &CollectionIdBundle,
1364 when: &QueryWhen,
1365 compute_instance: ComputeInstanceId,
1366 timeline_context: &TimelineContext,
1367 oracle_read_ts: Option<Timestamp>,
1368 real_time_recency_ts: Option<Timestamp>,
1369 ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
1370 // this is copy-pasted from Coordinator
1371
1372 let isolation_level = session.vars().transaction_isolation();
1373
1374 let (read_holds, upper) = self
1375 .acquire_read_holds_and_least_valid_write(id_bundle)
1376 .await
1377 .map_err(|err| {
1378 AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1379 err,
1380 compute_instance,
1381 )
1382 })?;
1383 let (det, read_holds) = <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1384 session,
1385 id_bundle,
1386 when,
1387 timeline_context,
1388 oracle_read_ts,
1389 real_time_recency_ts,
1390 isolation_level,
1391 read_holds,
1392 upper.clone(),
1393 )?;
1394
1395 session
1396 .metrics()
1397 .determine_timestamp(&[
1398 match det.respond_immediately() {
1399 true => "true",
1400 false => "false",
1401 },
1402 isolation_level.as_str(),
1403 &compute_instance.to_string(),
1404 ])
1405 .inc();
1406 if !det.respond_immediately()
1407 && isolation_level == &IsolationLevel::StrictSerializable
1408 && real_time_recency_ts.is_none()
1409 {
1410 // Note down the difference between StrictSerializable and Serializable into a metric.
1411 if let Some(strict) = det.timestamp_context.timestamp() {
1412 let (serializable_det, _tmp_read_holds) =
1413 <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1414 session,
1415 id_bundle,
1416 when,
1417 timeline_context,
1418 oracle_read_ts,
1419 real_time_recency_ts,
1420 &IsolationLevel::Serializable,
1421 read_holds.clone(),
1422 upper,
1423 )?;
1424 if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
1425 session
1426 .metrics()
1427 .timestamp_difference_for_strict_serializable_ms(&[compute_instance
1428 .to_string()
1429 .as_ref()])
1430 .observe(f64::cast_lossy(u64::from(
1431 strict.saturating_sub(*serializable),
1432 )));
1433 }
1434 }
1435 }
1436
1437 Ok((det, read_holds))
1438 }
1439
1440 fn assert_read_holds_correct(
1441 read_holds: &ReadHolds<Timestamp>,
1442 execution: &Execution,
1443 determination: &TimestampDetermination<Timestamp>,
1444 target_cluster_id: ClusterId,
1445 in_immediate_multi_stmt_txn: bool,
1446 ) {
1447 // Extract source_imports, index_imports, as_of, and execution_name based on Execution variant
1448 let (source_imports, index_imports, as_of, execution_name): (
1449 Vec<GlobalId>,
1450 Vec<GlobalId>,
1451 Timestamp,
1452 &str,
1453 ) = match execution {
1454 Execution::Peek {
1455 global_lir_plan, ..
1456 } => match global_lir_plan.peek_plan() {
1457 PeekPlan::FastPath(fast_path_plan) => {
1458 let (sources, indexes) = match fast_path_plan {
1459 FastPathPlan::Constant(..) => (vec![], vec![]),
1460 FastPathPlan::PeekExisting(_coll_id, idx_id, ..) => (vec![], vec![*idx_id]),
1461 FastPathPlan::PeekPersist(global_id, ..) => (vec![*global_id], vec![]),
1462 };
1463 (
1464 sources,
1465 indexes,
1466 determination.timestamp_context.timestamp_or_default(),
1467 "FastPath",
1468 )
1469 }
1470 PeekPlan::SlowPath(dataflow_plan) => {
1471 let as_of = dataflow_plan
1472 .desc
1473 .as_of
1474 .clone()
1475 .expect("dataflow has an as_of")
1476 .into_element();
1477 (
1478 dataflow_plan.desc.source_imports.keys().cloned().collect(),
1479 dataflow_plan.desc.index_imports.keys().cloned().collect(),
1480 as_of,
1481 "SlowPath",
1482 )
1483 }
1484 },
1485 Execution::CopyToS3 {
1486 global_lir_plan, ..
1487 } => {
1488 let df_desc = global_lir_plan.df_desc();
1489 let as_of = df_desc
1490 .as_of
1491 .clone()
1492 .expect("dataflow has an as_of")
1493 .into_element();
1494 (
1495 df_desc.source_imports.keys().cloned().collect(),
1496 df_desc.index_imports.keys().cloned().collect(),
1497 as_of,
1498 "CopyToS3",
1499 )
1500 }
1501 Execution::ExplainPlan { .. } | Execution::ExplainPushdown { .. } => {
1502 // No read holds assertions needed for EXPLAIN variants
1503 return;
1504 }
1505 };
1506
1507 // Assert that we have some read holds for all the imports of the dataflow.
1508 for id in source_imports.iter() {
1509 soft_assert_or_log!(
1510 read_holds.storage_holds.contains_key(id),
1511 "[{}] missing read hold for the source import {}; (in_immediate_multi_stmt_txn: {})",
1512 execution_name,
1513 id,
1514 in_immediate_multi_stmt_txn,
1515 );
1516 }
1517 for id in index_imports.iter() {
1518 soft_assert_or_log!(
1519 read_holds
1520 .compute_ids()
1521 .map(|(_instance, coll)| coll)
1522 .contains(id),
1523 "[{}] missing read hold for the index import {}; (in_immediate_multi_stmt_txn: {})",
1524 execution_name,
1525 id,
1526 in_immediate_multi_stmt_txn,
1527 );
1528 }
1529
1530 // Also check the holds against the as_of.
1531 for (id, h) in read_holds.storage_holds.iter() {
1532 soft_assert_or_log!(
1533 h.since().less_equal(&as_of),
1534 "[{}] storage read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1535 execution_name,
1536 h.since(),
1537 id,
1538 as_of,
1539 determination,
1540 in_immediate_multi_stmt_txn,
1541 );
1542 }
1543 for ((instance, id), h) in read_holds.compute_holds.iter() {
1544 soft_assert_eq_or_log!(
1545 *instance,
1546 target_cluster_id,
1547 "[{}] the read hold on {} is on the wrong cluster; (in_immediate_multi_stmt_txn: {})",
1548 execution_name,
1549 id,
1550 in_immediate_multi_stmt_txn,
1551 );
1552 soft_assert_or_log!(
1553 h.since().less_equal(&as_of),
1554 "[{}] compute read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}; (in_immediate_multi_stmt_txn: {})",
1555 execution_name,
1556 h.since(),
1557 id,
1558 as_of,
1559 determination,
1560 in_immediate_multi_stmt_txn,
1561 );
1562 }
1563 }
1564}
1565
1566/// Enum for branching among various execution steps after optimization
1567enum Execution {
1568 Peek {
1569 global_lir_plan: optimize::peek::GlobalLirPlan,
1570 optimization_finished_at: EpochMillis,
1571 plan_insights_optimizer_trace: Option<OptimizerTrace>,
1572 insights_ctx: Option<Box<PlanInsightsContext>>,
1573 },
1574 CopyToS3 {
1575 global_lir_plan: optimize::copy_to::GlobalLirPlan,
1576 source_ids: BTreeSet<GlobalId>,
1577 },
1578 ExplainPlan {
1579 df_meta: DataflowMetainfo,
1580 explain_ctx: ExplainPlanContext,
1581 optimizer: optimize::peek::Optimizer,
1582 insights_ctx: Option<Box<PlanInsightsContext>>,
1583 },
1584 ExplainPushdown {
1585 imports: BTreeMap<GlobalId, mz_expr::MapFilterProject>,
1586 determination: TimestampDetermination<Timestamp>,
1587 },
1588}