mz_adapter/frontend_peek.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::sync::Arc;
13
14use itertools::{Either, Itertools};
15use mz_adapter_types::dyncfgs::CONSTRAINT_BASED_TIMESTAMP_SELECTION;
16use mz_adapter_types::timestamp_selection::ConstraintBasedTimestampSelection;
17use mz_compute_types::ComputeInstanceId;
18use mz_expr::{CollectionPlan, ResultSpec};
19use mz_ore::cast::{CastFrom, CastLossy};
20use mz_ore::collections::CollectionExt;
21use mz_ore::now::EpochMillis;
22use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
23use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
24use mz_repr::role_id::RoleId;
25use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
26use mz_sql::ast::Raw;
27use mz_sql::catalog::CatalogCluster;
28use mz_sql::plan::Params;
29use mz_sql::plan::{self, Plan, QueryWhen};
30use mz_sql::rbac;
31use mz_sql::session::metadata::SessionMetadata;
32use mz_sql::session::vars::IsolationLevel;
33use mz_sql_parser::ast::{CopyDirection, CopyRelation, ExplainStage, ShowStatement, Statement};
34use mz_transform::EmptyStatisticsOracle;
35use mz_transform::dataflow::DataflowMetainfo;
36use opentelemetry::trace::TraceContextExt;
37use tracing::{Span, debug};
38use tracing_opentelemetry::OpenTelemetrySpanExt;
39
40use crate::catalog::{Catalog, CatalogState};
41use crate::command::Command;
42use crate::coord::peek::{FastPathPlan, PeekPlan};
43use crate::coord::sequencer::{eval_copy_to_uri, statistics_oracle};
44use crate::coord::timeline::timedomain_for;
45use crate::coord::timestamp_selection::TimestampDetermination;
46use crate::coord::{
47 Coordinator, CopyToContext, ExecuteContextGuard, ExplainContext, ExplainPlanContext,
48 TargetCluster,
49};
50use crate::explain::insights::PlanInsightsContext;
51use crate::explain::optimizer_trace::OptimizerTrace;
52use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
53use crate::optimize::{Optimize, OptimizerError};
54use crate::session::{Session, TransactionOps, TransactionStatus};
55use crate::statement_logging::WatchSetCreation;
56use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
57use crate::{
58 AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, PeekClient, ReadHolds,
59 TimelineContext, TimestampContext, TimestampProvider, optimize,
60};
61use crate::{coord, metrics};
62
63impl PeekClient {
64 /// Attempt to sequence a peek from the session task.
65 ///
66 /// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the
67 /// Coordinator's sequencing. If it returns an error, it should be returned to the user.
68 ///
69 /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
70 /// triggering the execution of the underlying query.
71 pub(crate) async fn try_frontend_peek(
72 &mut self,
73 portal_name: &str,
74 session: &mut Session,
75 outer_ctx_extra: &mut Option<ExecuteContextGuard>,
76 ) -> Result<Option<ExecuteResponse>, AdapterError> {
77 // # From handle_execute
78
79 if session.vars().emit_trace_id_notice() {
80 let span_context = tracing::Span::current()
81 .context()
82 .span()
83 .span_context()
84 .clone();
85 if span_context.is_valid() {
86 session.add_notice(AdapterNotice::QueryTrace {
87 trace_id: span_context.trace_id(),
88 });
89 }
90 }
91
92 // TODO(peek-seq): This snapshot is wasted when we end up bailing out from the frontend peek
93 // sequencing. We could solve this is with that optimization where we
94 // continuously keep a catalog snapshot in the session, and only get a new one when the
95 // catalog revision has changed, which we could see with an atomic read.
96 // But anyhow, this problem will just go away when we reach the point that we never fall
97 // back to the old sequencing.
98 let catalog = self.catalog_snapshot("try_frontend_peek").await;
99
100 // Extract things from the portal.
101 let (stmt, params, logging, lifecycle_timestamps) = {
102 if let Err(err) = Coordinator::verify_portal(&*catalog, session, portal_name) {
103 outer_ctx_extra
104 .take()
105 .and_then(|guard| guard.defuse().retire());
106 return Err(err);
107 }
108 let portal = session
109 .get_portal_unverified(portal_name)
110 // The portal is a session-level thing, so it couldn't have concurrently disappeared
111 // since the above verification.
112 .expect("called verify_portal above");
113 let params = portal.parameters.clone();
114 let stmt = portal.stmt.clone();
115 let logging = Arc::clone(&portal.logging);
116 let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
117 (stmt, params, logging, lifecycle_timestamps)
118 };
119
120 // Before planning, check if this is a statement type we can handle.
121 // This must happen BEFORE statement logging setup to avoid orphaned execution records.
122 if let Some(ref stmt) = stmt {
123 match &**stmt {
124 Statement::Select(_)
125 | Statement::ExplainAnalyzeObject(_)
126 | Statement::ExplainAnalyzeCluster(_)
127 | Statement::Show(ShowStatement::ShowObjects(_))
128 | Statement::Show(ShowStatement::ShowColumns(_)) => {
129 // These are always fine, just continue.
130 // Note: EXPLAIN ANALYZE will `plan` to `Plan::Select`.
131 // Note: ShowObjects plans to `Plan::Select`, ShowColumns plans to `Plan::ShowColumns`.
132 // We handle `Plan::ShowColumns` specially in `try_frontend_peek_inner`.
133 }
134 Statement::ExplainPlan(explain_stmt) => {
135 // Only handle ExplainPlan for SELECT statements.
136 // We don't want to handle e.g. EXPLAIN CREATE MATERIALIZED VIEW here, because that
137 // requires purification before planning, which the frontend peek sequencing doesn't
138 // do.
139 match &explain_stmt.explainee {
140 mz_sql_parser::ast::Explainee::Select(..) => {
141 // This is a SELECT, continue
142 }
143 _ => {
144 debug!(
145 "Bailing out from try_frontend_peek, because EXPLAIN is not for a SELECT query"
146 );
147 return Ok(None);
148 }
149 }
150 }
151 Statement::ExplainPushdown(explain_stmt) => {
152 // Only handle EXPLAIN FILTER PUSHDOWN for non-BROKEN SELECT statements
153 match &explain_stmt.explainee {
154 mz_sql_parser::ast::Explainee::Select(_, false) => {}
155 _ => {
156 debug!(
157 "Bailing out from try_frontend_peek, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is for EXPLAIN BROKEN"
158 );
159 return Ok(None);
160 }
161 }
162 }
163 Statement::Copy(copy_stmt) => {
164 match ©_stmt.direction {
165 CopyDirection::To => {
166 // Check for SUBSCRIBE inside COPY TO - we don't handle Plan::Subscribe
167 if matches!(©_stmt.relation, CopyRelation::Subscribe(_)) {
168 debug!(
169 "Bailing out from try_frontend_peek, because COPY (SUBSCRIBE ...) TO is not supported"
170 );
171 return Ok(None);
172 }
173 // This is COPY TO (SELECT), continue
174 }
175 CopyDirection::From => {
176 debug!(
177 "Bailing out from try_frontend_peek, because COPY FROM is not supported"
178 );
179 return Ok(None);
180 }
181 }
182 }
183 _ => {
184 debug!(
185 "Bailing out from try_frontend_peek, because statement type is not supported"
186 );
187 return Ok(None);
188 }
189 }
190 }
191
192 // Set up statement logging, and log the beginning of execution.
193 // (But only if we're not executing in the context of another statement.)
194 let statement_logging_id = if outer_ctx_extra.is_none() {
195 // This is a new statement, so begin statement logging
196 let result = self.statement_logging_frontend.begin_statement_execution(
197 session,
198 ¶ms,
199 &logging,
200 catalog.system_config(),
201 lifecycle_timestamps,
202 );
203
204 if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result {
205 self.log_began_execution(began_execution, mseh_update, prepared_statement);
206 Some(logging_id)
207 } else {
208 None
209 }
210 } else {
211 // We're executing in the context of another statement (e.g., FETCH),
212 // so extract the statement logging ID from the outer context if present.
213 // We take ownership and retire the outer context here. The end of execution will be
214 // logged in one of the following ways:
215 // - At the end of this function, if the execution is finished by then.
216 // - Later by the Coordinator, either due to RegisterFrontendPeek or ExecuteSlowPathPeek.
217 outer_ctx_extra
218 .take()
219 .and_then(|guard| guard.defuse().retire())
220 };
221
222 let result = self
223 .try_frontend_peek_inner(session, catalog, stmt, params, statement_logging_id)
224 .await;
225
226 // Log the end of execution if we are logging this statement and execution has already
227 // ended.
228 if let Some(logging_id) = statement_logging_id {
229 let reason = match &result {
230 // Streaming results are handled asynchronously by the coordinator
231 Ok(Some(ExecuteResponse::SendingRowsStreaming { .. })) => {
232 // Don't log here - the peek is still executing.
233 // It will be logged when handle_peek_notification is called.
234 return result;
235 }
236 // COPY TO needs to check its inner response
237 Ok(Some(resp @ ExecuteResponse::CopyTo { resp: inner, .. })) => {
238 match inner.as_ref() {
239 ExecuteResponse::SendingRowsStreaming { .. } => {
240 // Don't log here - the peek is still executing.
241 // It will be logged when handle_peek_notification is called.
242 return result;
243 }
244 // For non-streaming COPY TO responses, use the outer CopyTo for conversion
245 _ => resp.into(),
246 }
247 }
248 // Bailout case, which should not happen
249 Ok(None) => {
250 soft_panic_or_log!(
251 "Bailed out from `try_frontend_peek_inner` after we already logged the beginning of statement execution."
252 );
253 // This statement will be handled by the old peek sequencing, which will do its
254 // own statement logging from the beginning. So, let's close out this one.
255 self.log_ended_execution(
256 logging_id,
257 StatementEndedExecutionReason::Errored {
258 error: "Internal error: bailed out from `try_frontend_peek_inner`"
259 .to_string(),
260 },
261 );
262 return result;
263 }
264 // All other success responses - use the From implementation
265 // TODO(peek-seq): After we delete the old peek sequencing, we'll be able to adjust
266 // the From implementation to do exactly what we need in the frontend peek
267 // sequencing, so that the above special cases won't be needed.
268 Ok(Some(resp)) => resp.into(),
269 Err(e) => StatementEndedExecutionReason::Errored {
270 error: e.to_string(),
271 },
272 };
273
274 self.log_ended_execution(logging_id, reason);
275 }
276
277 result
278 }
279
280 /// This is encapsulated in an inner function so that the outer function can still do statement
281 /// logging after the `?` returns of the inner function.
282 async fn try_frontend_peek_inner(
283 &mut self,
284 session: &mut Session,
285 catalog: Arc<Catalog>,
286 stmt: Option<Arc<Statement<Raw>>>,
287 params: Params,
288 statement_logging_id: Option<crate::statement_logging::StatementLoggingId>,
289 ) -> Result<Option<ExecuteResponse>, AdapterError> {
290 let stmt = match stmt {
291 Some(stmt) => stmt,
292 None => {
293 debug!("try_frontend_peek_inner succeeded on an empty query");
294 return Ok(Some(ExecuteResponse::EmptyQuery));
295 }
296 };
297
298 session
299 .metrics()
300 .query_total(&[
301 metrics::session_type_label_value(session.user()),
302 metrics::statement_type_label_value(&stmt),
303 ])
304 .inc();
305
306 // # From handle_execute_inner
307
308 let conn_catalog = catalog.for_session(session);
309 // (`resolved_ids` should be derivable from `stmt`. If `stmt` is later transformed to
310 // remove/add IDs, then `resolved_ids` should be updated to also remove/add those IDs.)
311 let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?;
312
313 let pcx = session.pcx();
314 let plan = mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, ¶ms, &resolved_ids)?;
315
316 let (select_plan, explain_ctx, copy_to_ctx) = match &plan {
317 Plan::Select(select_plan) => {
318 let explain_ctx = if session.vars().emit_plan_insights_notice() {
319 let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
320 ExplainContext::PlanInsightsNotice(optimizer_trace)
321 } else {
322 ExplainContext::None
323 };
324 (select_plan, explain_ctx, None)
325 }
326 Plan::ShowColumns(show_columns_plan) => {
327 // ShowColumns wraps a SelectPlan, extract it and proceed as normal.
328 (&show_columns_plan.select_plan, ExplainContext::None, None)
329 }
330 Plan::ExplainPlan(plan::ExplainPlanPlan {
331 stage,
332 format,
333 config,
334 explainee:
335 plan::Explainee::Statement(plan::ExplaineeStatement::Select { broken, plan, desc }),
336 }) => {
337 // Create OptimizerTrace to collect optimizer plans
338 let optimizer_trace = OptimizerTrace::new(stage.paths());
339 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
340 broken: *broken,
341 config: config.clone(),
342 format: *format,
343 stage: *stage,
344 replan: None,
345 desc: Some(desc.clone()),
346 optimizer_trace,
347 });
348 (plan, explain_ctx, None)
349 }
350 // COPY TO S3
351 Plan::CopyTo(plan::CopyToPlan {
352 select_plan,
353 desc,
354 to,
355 connection,
356 connection_id,
357 format,
358 max_file_size,
359 }) => {
360 let uri = eval_copy_to_uri(to.clone(), session, catalog.state())?;
361
362 // (output_batch_count will be set later)
363 let copy_to_ctx = CopyToContext {
364 desc: desc.clone(),
365 uri,
366 connection: connection.clone(),
367 connection_id: *connection_id,
368 format: format.clone(),
369 max_file_size: *max_file_size,
370 output_batch_count: None,
371 };
372
373 (select_plan, ExplainContext::None, Some(copy_to_ctx))
374 }
375 Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => {
376 // Only handle EXPLAIN FILTER PUSHDOWN for SELECT statements
377 match explainee {
378 plan::Explainee::Statement(plan::ExplaineeStatement::Select {
379 broken: false,
380 plan,
381 desc: _,
382 }) => {
383 let explain_ctx = ExplainContext::Pushdown;
384 (plan, explain_ctx, None)
385 }
386 _ => {
387 // This shouldn't happen because we already checked for this at the AST
388 // level before calling `try_frontend_peek_inner`.
389 soft_panic_or_log!(
390 "unexpected EXPLAIN FILTER PUSHDOWN plan kind in frontend peek sequencing: {:?}",
391 explainee
392 );
393 debug!(
394 "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is EXPLAIN BROKEN"
395 );
396 return Ok(None);
397 }
398 }
399 }
400 Plan::SideEffectingFunc(sef_plan) => {
401 // Side-effecting functions need Coordinator state (e.g., active_conns),
402 // so delegate to the Coordinator via a Command.
403 // The RBAC check is performed in the Coordinator where active_conns is available.
404 let response = self
405 .call_coordinator(|tx| Command::ExecuteSideEffectingFunc {
406 plan: sef_plan.clone(),
407 conn_id: session.conn_id().clone(),
408 current_role: session.role_metadata().current_role,
409 tx,
410 })
411 .await?;
412 return Ok(Some(response));
413 }
414 _ => {
415 // This shouldn't happen because we already checked for this at the AST
416 // level before calling `try_frontend_peek_inner`.
417 soft_panic_or_log!(
418 "Unexpected plan kind in frontend peek sequencing: {:?}",
419 plan
420 );
421 debug!(
422 "Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, side-effecting SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO S3"
423 );
424 return Ok(None);
425 }
426 };
427
428 // # From sequence_plan
429
430 // We have checked the plan kind above.
431 assert!(plan.allowed_in_read_only());
432
433 let target_cluster = match session.transaction().cluster() {
434 // Use the current transaction's cluster.
435 Some(cluster_id) => TargetCluster::Transaction(cluster_id),
436 // If there isn't a current cluster set for a transaction, then try to auto route.
437 None => {
438 coord::catalog_serving::auto_run_on_catalog_server(&conn_catalog, session, &plan)
439 }
440 };
441 let (cluster, target_cluster_id, target_cluster_name) = {
442 let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
443 (cluster, cluster.id, &cluster.name)
444 };
445
446 // Log cluster selection
447 if let Some(logging_id) = &statement_logging_id {
448 self.log_set_cluster(*logging_id, target_cluster_id);
449 }
450
451 coord::catalog_serving::check_cluster_restrictions(
452 target_cluster_name.as_str(),
453 &conn_catalog,
454 &plan,
455 )?;
456
457 rbac::check_plan(
458 &conn_catalog,
459 // We can't look at `active_conns` here, but that's ok, because this case was handled
460 // above already inside `Command::ExecuteSideEffectingFunc`.
461 None::<fn(u32) -> Option<RoleId>>,
462 session,
463 &plan,
464 Some(target_cluster_id),
465 &resolved_ids,
466 )?;
467
468 if let Some((_, wait_future)) =
469 coord::appends::waiting_on_startup_appends(&*catalog, session, &plan)
470 {
471 wait_future.await;
472 }
473
474 let max_query_result_size = Some(session.vars().max_query_result_size());
475
476 // # From sequence_peek
477
478 // # From peek_validate
479
480 let compute_instance_snapshot =
481 ComputeInstanceSnapshot::new_without_collections(cluster.id());
482
483 let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config())
484 .override_from(&catalog.get_cluster(cluster.id()).config.features())
485 .override_from(&explain_ctx);
486
487 if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
488 return Err(AdapterError::NoClusterReplicasAvailable {
489 name: cluster.name.clone(),
490 is_managed: cluster.is_managed(),
491 });
492 }
493
494 let (_, view_id) = self.transient_id_gen.allocate_id();
495 let (_, index_id) = self.transient_id_gen.allocate_id();
496
497 let mut optimizer = if let Some(mut copy_to_ctx) = copy_to_ctx {
498 // COPY TO path: calculate output_batch_count and create copy_to optimizer
499 let worker_counts = cluster.replicas().map(|r| {
500 let loc = &r.config.location;
501 loc.workers().unwrap_or_else(|| loc.num_processes())
502 });
503 let max_worker_count = match worker_counts.max() {
504 Some(count) => u64::cast_from(count),
505 None => {
506 return Err(AdapterError::NoClusterReplicasAvailable {
507 name: cluster.name.clone(),
508 is_managed: cluster.is_managed(),
509 });
510 }
511 };
512 copy_to_ctx.output_batch_count = Some(max_worker_count);
513
514 Either::Right(optimize::copy_to::Optimizer::new(
515 Arc::clone(&catalog),
516 compute_instance_snapshot.clone(),
517 view_id,
518 copy_to_ctx,
519 optimizer_config,
520 self.optimizer_metrics.clone(),
521 ))
522 } else {
523 // SELECT/EXPLAIN path: create peek optimizer
524 Either::Left(optimize::peek::Optimizer::new(
525 Arc::clone(&catalog),
526 compute_instance_snapshot.clone(),
527 select_plan.finishing.clone(),
528 view_id,
529 index_id,
530 optimizer_config,
531 self.optimizer_metrics.clone(),
532 ))
533 };
534
535 let target_replica_name = session.vars().cluster_replica();
536 let mut target_replica = target_replica_name
537 .map(|name| {
538 cluster
539 .replica_id(name)
540 .ok_or(AdapterError::UnknownClusterReplica {
541 cluster_name: cluster.name.clone(),
542 replica_name: name.to_string(),
543 })
544 })
545 .transpose()?;
546
547 let source_ids = select_plan.source.depends_on();
548 // TODO(peek-seq): validate_timeline_context can be expensive in real scenarios (not in
549 // simple benchmarks), because it traverses transitive dependencies even of indexed views and
550 // materialized views (also traversing their MIR plans).
551 let mut timeline_context = catalog.validate_timeline_context(source_ids.iter().copied())?;
552 if matches!(timeline_context, TimelineContext::TimestampIndependent)
553 && select_plan.source.contains_temporal()?
554 {
555 // If the source IDs are timestamp independent but the query contains temporal functions,
556 // then the timeline context needs to be upgraded to timestamp dependent. This is
557 // required because `source_ids` doesn't contain functions.
558 timeline_context = TimelineContext::TimestampDependent;
559 }
560
561 let notices = coord::sequencer::check_log_reads(
562 &catalog,
563 cluster,
564 &source_ids,
565 &mut target_replica,
566 session.vars(),
567 )?;
568 session.add_notices(notices);
569
570 // # From peek_linearize_timestamp
571
572 let isolation_level = session.vars().transaction_isolation().clone();
573 let timeline = Coordinator::get_timeline(&timeline_context);
574 let needs_linearized_read_ts =
575 Coordinator::needs_linearized_read_ts(&isolation_level, &select_plan.when);
576
577 let oracle_read_ts = match timeline {
578 Some(timeline) if needs_linearized_read_ts => {
579 let oracle = self.ensure_oracle(timeline).await?;
580 let oracle_read_ts = oracle.read_ts().await;
581 Some(oracle_read_ts)
582 }
583 Some(_) | None => None,
584 };
585
586 // # From peek_real_time_recency
587
588 let vars = session.vars();
589 let real_time_recency_ts: Option<Timestamp> = if vars.real_time_recency()
590 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
591 && !session.contains_read_timestamp()
592 {
593 // Only call the coordinator when we actually need real-time recency
594 self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp {
595 source_ids: source_ids.clone(),
596 real_time_recency_timeout: *vars.real_time_recency_timeout(),
597 tx,
598 })
599 .await?
600 } else {
601 None
602 };
603
604 // # From peek_timestamp_read_hold
605
606 let dataflow_builder = DataflowBuilder::new(catalog.state(), compute_instance_snapshot);
607 let input_id_bundle = dataflow_builder.sufficient_collections(source_ids.clone());
608
609 // ## From sequence_peek_timestamp
610
611 // Warning: This will be false for AS OF queries, even if we are otherwise inside a
612 // multi-statement transaction. (It's also false for FreshestTableWrite, which is currently
613 // only read-then-write queries, which can't be part of multi-statement transactions, so
614 // FreshestTableWrite doesn't matter.)
615 //
616 // TODO(peek-seq): It's not totally clear to me what the intended semantics are for AS OF
617 // queries inside a transaction: We clearly can't use the transaction timestamp, but the old
618 // peek sequencing still does a timedomain validation. The new peek sequencing does not do
619 // timedomain validation for AS OF queries, which seems more natural. But I'm thinking that
620 // it would be the cleanest to just simply disallow AS OF queries inside transactions.
621 let in_immediate_multi_stmt_txn = session
622 .transaction()
623 .in_immediate_multi_stmt_txn(&select_plan.when);
624
625 // Fetch or generate a timestamp for this query and fetch or acquire read holds.
626 let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
627 // Use the transaction's timestamp if it exists and this isn't an AS OF query.
628 // (`in_immediate_multi_stmt_txn` is false for AS OF queries.)
629 Some(
630 determination @ TimestampDetermination {
631 timestamp_context: TimestampContext::TimelineTimestamp { .. },
632 ..
633 },
634 ) if in_immediate_multi_stmt_txn => {
635 // This is a subsequent (non-AS OF, non-constant) query in a multi-statement
636 // transaction. We now:
637 // - Validate that the query only accesses collections within the transaction's
638 // timedomain (which we know from the stored read holds).
639 // - Use the transaction's stored timestamp determination.
640 // - Use the (relevant subset of the) transaction's read holds.
641
642 let txn_read_holds_opt = self
643 .call_coordinator(|tx| Command::GetTransactionReadHoldsBundle {
644 conn_id: session.conn_id().clone(),
645 tx,
646 })
647 .await;
648
649 if let Some(txn_read_holds) = txn_read_holds_opt {
650 let allowed_id_bundle = txn_read_holds.id_bundle();
651 let outside = input_id_bundle.difference(&allowed_id_bundle);
652
653 // Queries without a timestamp and timeline can belong to any existing timedomain.
654 if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
655 let valid_names =
656 allowed_id_bundle.resolve_names(&*catalog, session.conn_id());
657 let invalid_names = outside.resolve_names(&*catalog, session.conn_id());
658 return Err(AdapterError::RelationOutsideTimeDomain {
659 relations: invalid_names,
660 names: valid_names,
661 });
662 }
663
664 // Extract the subset of read holds for the collections this query accesses.
665 let read_holds = txn_read_holds.subset(&input_id_bundle);
666
667 (determination, read_holds)
668 } else {
669 // This should never happen: we're in a subsequent query of a multi-statement
670 // transaction (we have a transaction timestamp), but the coordinator has no
671 // transaction read holds stored. This indicates a bug in the transaction
672 // handling.
673 return Err(AdapterError::Internal(
674 "Missing transaction read holds for multi-statement transaction"
675 .to_string(),
676 ));
677 }
678 }
679 _ => {
680 // There is no timestamp determination yet for this transaction. Either:
681 // - We are not in a multi-statement transaction.
682 // - This is the first (non-AS OF) query in a multi-statement transaction.
683 // - This is an AS OF query.
684 // - This is a constant query (`TimestampContext::NoTimestamp`).
685
686 let timedomain_bundle;
687 let determine_bundle = if in_immediate_multi_stmt_txn {
688 // This is the first (non-AS OF) query in a multi-statement transaction.
689 // Determine a timestamp that will be valid for anything in any schema
690 // referenced by the first query.
691 timedomain_bundle = timedomain_for(
692 &*catalog,
693 &dataflow_builder,
694 &source_ids,
695 &timeline_context,
696 session.conn_id(),
697 target_cluster_id,
698 )?;
699 &timedomain_bundle
700 } else {
701 // Simply use the inputs of the current query.
702 &input_id_bundle
703 };
704 let (determination, read_holds) = self
705 .frontend_determine_timestamp(
706 catalog.state(),
707 session,
708 determine_bundle,
709 &select_plan.when,
710 target_cluster_id,
711 &timeline_context,
712 oracle_read_ts,
713 real_time_recency_ts,
714 )
715 .await?;
716
717 // If this is the first (non-AS OF) query in a multi-statement transaction, store
718 // the read holds in the coordinator, so subsequent queries can validate against
719 // them.
720 if in_immediate_multi_stmt_txn {
721 self.call_coordinator(|tx| Command::StoreTransactionReadHolds {
722 conn_id: session.conn_id().clone(),
723 read_holds: read_holds.clone(),
724 tx,
725 })
726 .await;
727 }
728
729 (determination, read_holds)
730 }
731 };
732
733 {
734 // Assert that we have a read hold for all the collections in our `input_id_bundle`.
735 for id in input_id_bundle.iter() {
736 let s = read_holds.storage_holds.contains_key(&id);
737 let c = read_holds
738 .compute_ids()
739 .map(|(_instance, coll)| coll)
740 .contains(&id);
741 soft_assert_or_log!(
742 s || c,
743 "missing read hold for collection {} in `input_id_bundle",
744 id
745 );
746 }
747
748 // Assert that each part of the `input_id_bundle` corresponds to the right part of
749 // `read_holds`.
750 for id in input_id_bundle.storage_ids.iter() {
751 soft_assert_or_log!(
752 read_holds.storage_holds.contains_key(id),
753 "missing storage read hold for collection {} in `input_id_bundle",
754 id
755 );
756 }
757 for id in input_id_bundle
758 .compute_ids
759 .iter()
760 .flat_map(|(_instance, colls)| colls)
761 {
762 soft_assert_or_log!(
763 read_holds
764 .compute_ids()
765 .map(|(_instance, coll)| coll)
766 .contains(id),
767 "missing compute read hold for collection {} in `input_id_bundle",
768 id,
769 );
770 }
771 }
772
773 // (TODO(peek-seq): The below TODO is copied from the old peek sequencing. We should resolve
774 // this when we decide what to with `AS OF` in transactions.)
775 // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
776 // arbitrary and we don't recall why we did it (possibly an error!). Change this to always
777 // set the transaction ops. Decide and document what our policy should be on AS OF queries.
778 // Maybe they shouldn't be allowed in transactions at all because it's hard to explain
779 // what's going on there. This should probably get a small design document.
780
781 // We only track the peeks in the session if the query doesn't use AS
782 // OF or we're inside an explicit transaction. The latter case is
783 // necessary to support PG's `BEGIN` semantics, whose behavior can
784 // depend on whether or not reads have occurred in the txn.
785 let requires_linearization = (&explain_ctx).into();
786 let mut transaction_determination = determination.clone();
787 if select_plan.when.is_transactional() {
788 session.add_transaction_ops(TransactionOps::Peeks {
789 determination: transaction_determination,
790 cluster_id: target_cluster_id,
791 requires_linearization,
792 })?;
793 } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
794 // If the query uses AS OF, then ignore the timestamp.
795 transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
796 session.add_transaction_ops(TransactionOps::Peeks {
797 determination: transaction_determination,
798 cluster_id: target_cluster_id,
799 requires_linearization,
800 })?;
801 };
802
803 // # From peek_optimize
804
805 let stats = statistics_oracle(
806 session,
807 &source_ids,
808 &determination.timestamp_context.antichain(),
809 true,
810 catalog.system_config(),
811 &*self.storage_collections,
812 )
813 .await
814 .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
815
816 // Generate data structures that can be moved to another task where we will perform possibly
817 // expensive optimizations.
818 let timestamp_context = determination.timestamp_context.clone();
819 let session_meta = session.meta();
820 let now = catalog.config().now.clone();
821 let select_plan = select_plan.clone();
822 let target_cluster_name = target_cluster_name.clone();
823 let needs_plan_insights = explain_ctx.needs_plan_insights();
824 let determination_for_pushdown = if matches!(explain_ctx, ExplainContext::Pushdown) {
825 // This is a hairy data structure, so avoid this clone if we are not in
826 // EXPLAIN FILTER PUSHDOWN.
827 Some(determination.clone())
828 } else {
829 None
830 };
831
832 let span = Span::current();
833
834 // Prepare data for plan insights if needed
835 let catalog_for_insights = if needs_plan_insights {
836 Some(Arc::clone(&catalog))
837 } else {
838 None
839 };
840 let mut compute_instances = BTreeMap::new();
841 if needs_plan_insights {
842 for user_cluster in catalog.user_clusters() {
843 let snapshot = ComputeInstanceSnapshot::new_without_collections(user_cluster.id);
844 compute_instances.insert(user_cluster.name.clone(), snapshot);
845 }
846 }
847
848 // Enum for branching among various execution steps after optimization
849 enum Execution {
850 Peek {
851 global_lir_plan: optimize::peek::GlobalLirPlan,
852 optimization_finished_at: EpochMillis,
853 plan_insights_optimizer_trace: Option<OptimizerTrace>,
854 insights_ctx: Option<Box<PlanInsightsContext>>,
855 },
856 CopyToS3 {
857 global_lir_plan: optimize::copy_to::GlobalLirPlan,
858 source_ids: BTreeSet<GlobalId>,
859 },
860 ExplainPlan {
861 df_meta: DataflowMetainfo,
862 explain_ctx: ExplainPlanContext,
863 optimizer: optimize::peek::Optimizer,
864 insights_ctx: Option<Box<PlanInsightsContext>>,
865 },
866 ExplainPushdown {
867 imports: BTreeMap<GlobalId, mz_expr::MapFilterProject>,
868 determination: TimestampDetermination<Timestamp>,
869 },
870 }
871
872 let source_ids_for_closure = source_ids.clone();
873 let optimization_result = mz_ore::task::spawn_blocking(
874 || "optimize peek",
875 move || {
876 span.in_scope(|| {
877 let _dispatch_guard = explain_ctx.dispatch_guard();
878
879 let raw_expr = select_plan.source.clone();
880
881 // The purpose of wrapping the following in a closure is to control where the
882 // `?`s return from, so that even when a `catch_unwind_optimize` call fails,
883 // we can still handle `EXPLAIN BROKEN`.
884 let pipeline = || -> Result<Either<optimize::peek::GlobalLirPlan, optimize::copy_to::GlobalLirPlan>, OptimizerError> {
885 match optimizer.as_mut() {
886 Either::Left(optimizer) => {
887 // SELECT/EXPLAIN path
888 // HIR ⇒ MIR lowering and MIR optimization (local)
889 let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr.clone())?;
890 // Attach resolved context required to continue the pipeline.
891 let local_mir_plan =
892 local_mir_plan.resolve(timestamp_context.clone(), &session_meta, stats);
893 // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
894 let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
895 Ok(Either::Left(global_lir_plan))
896 }
897 Either::Right(optimizer) => {
898 // COPY TO path
899 // HIR ⇒ MIR lowering and MIR optimization (local)
900 let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr.clone())?;
901 // Attach resolved context required to continue the pipeline.
902 let local_mir_plan =
903 local_mir_plan.resolve(timestamp_context.clone(), &session_meta, stats);
904 // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
905 let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
906 Ok(Either::Right(global_lir_plan))
907 }
908 }
909 };
910
911 let global_lir_plan_result = pipeline();
912 let optimization_finished_at = now();
913
914 let create_insights_ctx = |optimizer: &optimize::peek::Optimizer, is_notice: bool| -> Option<Box<PlanInsightsContext>> {
915 if !needs_plan_insights {
916 return None;
917 }
918
919 let catalog = catalog_for_insights.as_ref()?;
920
921 let enable_re_optimize = if needs_plan_insights {
922 // Disable any plan insights that use the optimizer if we only want the
923 // notice and plan optimization took longer than the threshold. This is
924 // to prevent a situation where optimizing takes a while and there are
925 // lots of clusters, which would delay peek execution by the product of
926 // those.
927 //
928 // (This heuristic doesn't work well, see #9492.)
929 let opt_limit = mz_adapter_types::dyncfgs::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
930 .get(catalog.system_config().dyncfgs());
931 !(is_notice && optimizer.duration() > opt_limit)
932 } else {
933 false
934 };
935
936 Some(Box::new(PlanInsightsContext {
937 stmt: select_plan.select.as_deref().map(Clone::clone).map(Statement::Select),
938 raw_expr: raw_expr.clone(),
939 catalog: Arc::clone(catalog),
940 compute_instances,
941 target_instance: target_cluster_name,
942 metrics: optimizer.metrics().clone(),
943 finishing: optimizer.finishing().clone(),
944 optimizer_config: optimizer.config().clone(),
945 session: session_meta,
946 timestamp_context,
947 view_id: optimizer.select_id(),
948 index_id: optimizer.index_id(),
949 enable_re_optimize,
950 }))
951 };
952
953 match global_lir_plan_result {
954 Ok(Either::Left(global_lir_plan)) => {
955 // SELECT/EXPLAIN path
956 let optimizer = optimizer.unwrap_left();
957 match explain_ctx {
958 ExplainContext::Plan(explain_ctx) => {
959 let (_, df_meta, _) = global_lir_plan.unapply();
960 let insights_ctx = create_insights_ctx(&optimizer, false);
961 Ok(Execution::ExplainPlan {
962 df_meta,
963 explain_ctx,
964 optimizer,
965 insights_ctx,
966 })
967 }
968 ExplainContext::None => {
969 Ok(Execution::Peek {
970 global_lir_plan,
971 optimization_finished_at,
972 plan_insights_optimizer_trace: None,
973 insights_ctx: None,
974 })
975 }
976 ExplainContext::PlanInsightsNotice(optimizer_trace) => {
977 let insights_ctx = create_insights_ctx(&optimizer, true);
978 Ok(Execution::Peek {
979 global_lir_plan,
980 optimization_finished_at,
981 plan_insights_optimizer_trace: Some(optimizer_trace),
982 insights_ctx,
983 })
984 }
985 ExplainContext::Pushdown => {
986 let (plan, _, _) = global_lir_plan.unapply();
987 let imports = match plan {
988 PeekPlan::SlowPath(plan) => plan
989 .desc
990 .source_imports
991 .into_iter()
992 .filter_map(|(id, (desc, _, _upper))| {
993 desc.arguments.operators.map(|mfp| (id, mfp))
994 })
995 .collect(),
996 PeekPlan::FastPath(_) => std::collections::BTreeMap::default(),
997 };
998 Ok(Execution::ExplainPushdown {
999 imports,
1000 determination: determination_for_pushdown.expect("it's present for the ExplainPushdown case"),
1001 })
1002 }
1003 }
1004 }
1005 Ok(Either::Right(global_lir_plan)) => {
1006 // COPY TO S3 path
1007 Ok(Execution::CopyToS3 {
1008 global_lir_plan,
1009 source_ids: source_ids_for_closure,
1010 })
1011 }
1012 Err(err) => {
1013 if optimizer.is_right() {
1014 // COPY TO has no EXPLAIN BROKEN support
1015 return Err(err);
1016 }
1017 // SELECT/EXPLAIN error handling
1018 let optimizer = optimizer.expect_left("checked above");
1019 if let ExplainContext::Plan(explain_ctx) = explain_ctx {
1020 if explain_ctx.broken {
1021 // EXPLAIN BROKEN: log error and continue with defaults
1022 tracing::error!("error while handling EXPLAIN statement: {}", err);
1023 Ok(Execution::ExplainPlan {
1024 df_meta: Default::default(),
1025 explain_ctx,
1026 optimizer,
1027 insights_ctx: None,
1028 })
1029 } else {
1030 Err(err)
1031 }
1032 } else {
1033 Err(err)
1034 }
1035 }
1036 }
1037 })
1038 },
1039 )
1040 .await
1041 .map_err(|optimizer_error| AdapterError::Internal(format!("internal error in optimizer: {}", optimizer_error)))?;
1042
1043 // Log optimization finished
1044 if let Some(logging_id) = &statement_logging_id {
1045 self.log_lifecycle_event(*logging_id, StatementLifecycleEvent::OptimizationFinished);
1046 }
1047
1048 // Handle the optimization result: either generate EXPLAIN output or continue with execution
1049 match optimization_result {
1050 Execution::ExplainPlan {
1051 df_meta,
1052 explain_ctx,
1053 optimizer,
1054 insights_ctx,
1055 } => {
1056 let rows = coord::sequencer::explain_plan_inner(
1057 session,
1058 &catalog,
1059 df_meta,
1060 explain_ctx,
1061 optimizer,
1062 insights_ctx,
1063 )
1064 .await?;
1065
1066 Ok(Some(ExecuteResponse::SendingRowsImmediate {
1067 rows: Box::new(rows.into_row_iter()),
1068 }))
1069 }
1070 Execution::ExplainPushdown {
1071 imports,
1072 determination,
1073 } => {
1074 // # From peek_explain_pushdown
1075
1076 let as_of = determination.timestamp_context.antichain();
1077 let mz_now = determination
1078 .timestamp_context
1079 .timestamp()
1080 .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1081 .unwrap_or_else(ResultSpec::value_all);
1082
1083 Ok(Some(
1084 coord::sequencer::explain_pushdown_future_inner(
1085 session,
1086 &*catalog,
1087 &self.storage_collections,
1088 as_of,
1089 mz_now,
1090 imports,
1091 )
1092 .await
1093 .await?,
1094 ))
1095 }
1096 Execution::Peek {
1097 global_lir_plan,
1098 optimization_finished_at: _optimization_finished_at,
1099 plan_insights_optimizer_trace,
1100 insights_ctx,
1101 } => {
1102 // Continue with normal execution
1103 // # From peek_finish
1104
1105 let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
1106
1107 coord::sequencer::emit_optimizer_notices(
1108 &*catalog,
1109 session,
1110 &df_meta.optimizer_notices,
1111 );
1112
1113 // Generate plan insights notice if needed
1114 if let Some(trace) = plan_insights_optimizer_trace {
1115 let target_cluster = catalog.get_cluster(target_cluster_id);
1116 let features = OptimizerFeatures::from(catalog.system_config())
1117 .override_from(&target_cluster.config.features());
1118 let insights = trace
1119 .into_plan_insights(
1120 &features,
1121 &catalog.for_session(session),
1122 Some(select_plan.finishing.clone()),
1123 Some(target_cluster),
1124 df_meta.clone(),
1125 insights_ctx,
1126 )
1127 .await?;
1128 session.add_notice(AdapterNotice::PlanInsights(insights));
1129 }
1130
1131 // # Now back to peek_finish
1132
1133 let watch_set = statement_logging_id.map(|logging_id| {
1134 WatchSetCreation::new(
1135 logging_id,
1136 catalog.state(),
1137 &input_id_bundle,
1138 determination.timestamp_context.timestamp_or_default(),
1139 )
1140 });
1141
1142 let max_result_size = catalog.system_config().max_result_size();
1143
1144 // Clone determination if we need it for emit_timestamp_notice, since it may be
1145 // moved into Command::ExecuteSlowPathPeek.
1146 let determination_for_notice = if session.vars().emit_timestamp_notice() {
1147 Some(determination.clone())
1148 } else {
1149 None
1150 };
1151
1152 let response = match peek_plan {
1153 PeekPlan::FastPath(fast_path_plan) => {
1154 if let Some(logging_id) = &statement_logging_id {
1155 // TODO(peek-seq): Actually, we should log it also for
1156 // FastPathPlan::Constant. The only reason we are not doing so at the
1157 // moment is to match the old peek sequencing, so that statement logging
1158 // tests pass with the frontend peek sequencing turned both on and off.
1159 //
1160 // When the old sequencing is removed, we should make a couple of
1161 // changes in how we log timestamps:
1162 // - Move this up to just after timestamp determination, so that it
1163 // appears in the log as soon as possible.
1164 // - Do it also for Constant peeks.
1165 // - Currently, slow-path peeks' timestamp logging is done by
1166 // `implement_peek_plan`. We could remove it from there, and just do
1167 // it here.
1168 if !matches!(fast_path_plan, FastPathPlan::Constant(..)) {
1169 self.log_set_timestamp(
1170 *logging_id,
1171 determination.timestamp_context.timestamp_or_default(),
1172 );
1173 }
1174 }
1175
1176 let row_set_finishing_seconds =
1177 session.metrics().row_set_finishing_seconds().clone();
1178
1179 let peek_stash_read_batch_size_bytes =
1180 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
1181 .get(catalog.system_config().dyncfgs());
1182 let peek_stash_read_memory_budget_bytes =
1183 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
1184 .get(catalog.system_config().dyncfgs());
1185
1186 self.implement_fast_path_peek_plan(
1187 fast_path_plan,
1188 determination.timestamp_context.timestamp_or_default(),
1189 select_plan.finishing,
1190 target_cluster_id,
1191 target_replica,
1192 typ,
1193 max_result_size,
1194 max_query_result_size,
1195 row_set_finishing_seconds,
1196 read_holds,
1197 peek_stash_read_batch_size_bytes,
1198 peek_stash_read_memory_budget_bytes,
1199 session.conn_id().clone(),
1200 source_ids,
1201 watch_set,
1202 )
1203 .await?
1204 }
1205 PeekPlan::SlowPath(dataflow_plan) => {
1206 {
1207 // Assert that we have some read holds for all the imports of the dataflow.
1208 for id in dataflow_plan.desc.source_imports.keys() {
1209 soft_assert_or_log!(
1210 read_holds.storage_holds.contains_key(id),
1211 "missing read hold for the source import {}",
1212 id
1213 );
1214 }
1215 for id in dataflow_plan.desc.index_imports.keys() {
1216 soft_assert_or_log!(
1217 read_holds
1218 .compute_ids()
1219 .map(|(_instance, coll)| coll)
1220 .contains(id),
1221 "missing read hold for the index import {}",
1222 id,
1223 );
1224 }
1225
1226 // Also check the holds against the as_of.
1227 for (id, h) in read_holds.storage_holds.iter() {
1228 let as_of = dataflow_plan
1229 .desc
1230 .as_of
1231 .clone()
1232 .expect("dataflow has an as_of")
1233 .into_element();
1234 soft_assert_or_log!(
1235 h.since().less_equal(&as_of),
1236 "storage read hold at {:?} for collection {} is not enough for as_of {:?}",
1237 h.since(),
1238 id,
1239 as_of
1240 );
1241 }
1242 for ((instance, id), h) in read_holds.compute_holds.iter() {
1243 soft_assert_eq_or_log!(
1244 *instance,
1245 target_cluster_id,
1246 "the read hold on {} is on the wrong cluster",
1247 id
1248 );
1249 let as_of = dataflow_plan
1250 .desc
1251 .as_of
1252 .clone()
1253 .expect("dataflow has an as_of")
1254 .into_element();
1255 soft_assert_or_log!(
1256 h.since().less_equal(&as_of),
1257 "compute read hold at {:?} for collection {} is not enough for as_of {:?}",
1258 h.since(),
1259 id,
1260 as_of
1261 );
1262 }
1263 }
1264
1265 if let Some(logging_id) = &statement_logging_id {
1266 self.log_set_transient_index_id(*logging_id, dataflow_plan.id);
1267 }
1268
1269 self.call_coordinator(|tx| Command::ExecuteSlowPathPeek {
1270 dataflow_plan: Box::new(dataflow_plan),
1271 determination,
1272 finishing: select_plan.finishing,
1273 compute_instance: target_cluster_id,
1274 target_replica,
1275 intermediate_result_type: typ,
1276 source_ids,
1277 conn_id: session.conn_id().clone(),
1278 max_result_size,
1279 max_query_result_size,
1280 watch_set,
1281 tx,
1282 })
1283 .await?
1284 }
1285 };
1286
1287 // Add timestamp notice if emit_timestamp_notice is enabled
1288 if let Some(determination) = determination_for_notice {
1289 let explanation = self
1290 .call_coordinator(|tx| Command::ExplainTimestamp {
1291 conn_id: session.conn_id().clone(),
1292 session_wall_time: session.pcx().wall_time,
1293 cluster_id: target_cluster_id,
1294 id_bundle: input_id_bundle.clone(),
1295 determination,
1296 tx,
1297 })
1298 .await;
1299 session.add_notice(AdapterNotice::QueryTimestamp { explanation });
1300 }
1301
1302 Ok(Some(match select_plan.copy_to {
1303 None => response,
1304 // COPY TO STDOUT
1305 Some(format) => ExecuteResponse::CopyTo {
1306 format,
1307 resp: Box::new(response),
1308 },
1309 }))
1310 }
1311 Execution::CopyToS3 {
1312 global_lir_plan,
1313 source_ids,
1314 } => {
1315 let (df_desc, df_meta) = global_lir_plan.unapply();
1316
1317 coord::sequencer::emit_optimizer_notices(
1318 &*catalog,
1319 session,
1320 &df_meta.optimizer_notices,
1321 );
1322
1323 // Extract S3 sink connection info for preflight check
1324 let sink_id = df_desc.sink_id();
1325 let sinks = &df_desc.sink_exports;
1326 if sinks.len() != 1 {
1327 return Err(AdapterError::Internal(
1328 "expected exactly one copy to s3 sink".into(),
1329 ));
1330 }
1331 let (_, sink_desc) = sinks
1332 .first_key_value()
1333 .expect("known to be exactly one copy to s3 sink");
1334 let s3_sink_connection = match &sink_desc.connection {
1335 mz_compute_types::sinks::ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1336 conn.clone()
1337 }
1338 _ => {
1339 return Err(AdapterError::Internal(
1340 "expected copy to s3 oneshot sink".into(),
1341 ));
1342 }
1343 };
1344
1345 // Perform S3 preflight check in background task (via coordinator).
1346 // This runs slow S3 operations without blocking the coordinator's main task.
1347 self.call_coordinator(|tx| Command::CopyToPreflight {
1348 s3_sink_connection,
1349 sink_id,
1350 tx,
1351 })
1352 .await?;
1353
1354 // Preflight succeeded, now execute the actual COPY TO dataflow
1355 let watch_set = statement_logging_id.map(|logging_id| {
1356 WatchSetCreation::new(
1357 logging_id,
1358 catalog.state(),
1359 &input_id_bundle,
1360 determination.timestamp_context.timestamp_or_default(),
1361 )
1362 });
1363
1364 let response = self
1365 .call_coordinator(|tx| Command::ExecuteCopyTo {
1366 df_desc: Box::new(df_desc),
1367 compute_instance: target_cluster_id,
1368 target_replica,
1369 source_ids,
1370 conn_id: session.conn_id().clone(),
1371 watch_set,
1372 tx,
1373 })
1374 .await?;
1375
1376 Ok(Some(response))
1377 }
1378 }
1379 }
1380
1381 /// (Similar to Coordinator::determine_timestamp)
1382 /// Determines the timestamp for a query, acquires read holds that ensure the
1383 /// query remains executable at that time, and returns those.
1384 /// The caller is responsible for eventually dropping those read holds.
1385 ///
1386 /// Note: self is taken &mut because of the lazy fetching in `get_compute_instance_client`.
1387 pub(crate) async fn frontend_determine_timestamp(
1388 &mut self,
1389 catalog_state: &CatalogState,
1390 session: &Session,
1391 id_bundle: &CollectionIdBundle,
1392 when: &QueryWhen,
1393 compute_instance: ComputeInstanceId,
1394 timeline_context: &TimelineContext,
1395 oracle_read_ts: Option<Timestamp>,
1396 real_time_recency_ts: Option<Timestamp>,
1397 ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
1398 // this is copy-pasted from Coordinator
1399
1400 let constraint_based = ConstraintBasedTimestampSelection::from_str(
1401 &CONSTRAINT_BASED_TIMESTAMP_SELECTION.get(catalog_state.system_config().dyncfgs()),
1402 );
1403
1404 let isolation_level = session.vars().transaction_isolation();
1405
1406 let (read_holds, upper) = self
1407 .acquire_read_holds_and_least_valid_write(id_bundle)
1408 .await
1409 .map_err(|err| {
1410 AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1411 err,
1412 compute_instance,
1413 )
1414 })?;
1415 let (det, read_holds) = <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1416 session,
1417 id_bundle,
1418 when,
1419 compute_instance,
1420 timeline_context,
1421 oracle_read_ts,
1422 real_time_recency_ts,
1423 isolation_level,
1424 &constraint_based,
1425 read_holds,
1426 upper.clone(),
1427 )?;
1428
1429 session
1430 .metrics()
1431 .determine_timestamp(&[
1432 match det.respond_immediately() {
1433 true => "true",
1434 false => "false",
1435 },
1436 isolation_level.as_str(),
1437 &compute_instance.to_string(),
1438 constraint_based.as_str(),
1439 ])
1440 .inc();
1441 if !det.respond_immediately()
1442 && isolation_level == &IsolationLevel::StrictSerializable
1443 && real_time_recency_ts.is_none()
1444 {
1445 // Note down the difference between StrictSerializable and Serializable into a metric.
1446 if let Some(strict) = det.timestamp_context.timestamp() {
1447 let (serializable_det, _tmp_read_holds) =
1448 <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1449 session,
1450 id_bundle,
1451 when,
1452 compute_instance,
1453 timeline_context,
1454 oracle_read_ts,
1455 real_time_recency_ts,
1456 isolation_level,
1457 &constraint_based,
1458 read_holds.clone(),
1459 upper,
1460 )?;
1461 if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
1462 session
1463 .metrics()
1464 .timestamp_difference_for_strict_serializable_ms(&[
1465 compute_instance.to_string().as_ref(),
1466 constraint_based.as_str(),
1467 ])
1468 .observe(f64::cast_lossy(u64::from(
1469 strict.saturating_sub(*serializable),
1470 )));
1471 }
1472 }
1473 }
1474
1475 Ok((det, read_holds))
1476 }
1477}