1use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::sync::Arc;
13
14use itertools::{Either, Itertools};
15use mz_adapter_types::dyncfgs::CONSTRAINT_BASED_TIMESTAMP_SELECTION;
16use mz_adapter_types::timestamp_selection::ConstraintBasedTimestampSelection;
17use mz_compute_types::ComputeInstanceId;
18use mz_controller_types::ClusterId;
19use mz_expr::{CollectionPlan, ResultSpec};
20use mz_ore::cast::{CastFrom, CastLossy};
21use mz_ore::collections::CollectionExt;
22use mz_ore::now::EpochMillis;
23use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
24use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
25use mz_repr::role_id::RoleId;
26use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
27use mz_sql::ast::Raw;
28use mz_sql::catalog::CatalogCluster;
29use mz_sql::plan::Params;
30use mz_sql::plan::{self, Plan, QueryWhen};
31use mz_sql::rbac;
32use mz_sql::session::metadata::SessionMetadata;
33use mz_sql::session::vars::IsolationLevel;
34use mz_sql_parser::ast::{CopyDirection, CopyRelation, ExplainStage, ShowStatement, Statement};
35use mz_transform::EmptyStatisticsOracle;
36use mz_transform::dataflow::DataflowMetainfo;
37use opentelemetry::trace::TraceContextExt;
38use tracing::{Span, debug, warn};
39use tracing_opentelemetry::OpenTelemetrySpanExt;
40
41use crate::catalog::{Catalog, CatalogState};
42use crate::command::Command;
43use crate::coord::peek::{FastPathPlan, PeekPlan};
44use crate::coord::sequencer::{eval_copy_to_uri, statistics_oracle};
45use crate::coord::timeline::timedomain_for;
46use crate::coord::timestamp_selection::TimestampDetermination;
47use crate::coord::{
48 Coordinator, CopyToContext, ExecuteContextGuard, ExplainContext, ExplainPlanContext,
49 TargetCluster,
50};
51use crate::explain::insights::PlanInsightsContext;
52use crate::explain::optimizer_trace::OptimizerTrace;
53use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
54use crate::optimize::{Optimize, OptimizerError};
55use crate::session::{Session, TransactionOps, TransactionStatus};
56use crate::statement_logging::WatchSetCreation;
57use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
58use crate::{
59 AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, PeekClient, ReadHolds,
60 TimelineContext, TimestampContext, TimestampProvider, optimize,
61};
62use crate::{coord, metrics};
63
64impl PeekClient {
65 pub(crate) async fn try_frontend_peek(
73 &mut self,
74 portal_name: &str,
75 session: &mut Session,
76 outer_ctx_extra: &mut Option<ExecuteContextGuard>,
77 ) -> Result<Option<ExecuteResponse>, AdapterError> {
78 if session.vars().emit_trace_id_notice() {
81 let span_context = tracing::Span::current()
82 .context()
83 .span()
84 .span_context()
85 .clone();
86 if span_context.is_valid() {
87 session.add_notice(AdapterNotice::QueryTrace {
88 trace_id: span_context.trace_id(),
89 });
90 }
91 }
92
93 let catalog = self.catalog_snapshot("try_frontend_peek").await;
100
101 let (stmt, params, logging, lifecycle_timestamps) = {
103 if let Err(err) = Coordinator::verify_portal(&*catalog, session, portal_name) {
104 outer_ctx_extra
105 .take()
106 .and_then(|guard| guard.defuse().retire());
107 return Err(err);
108 }
109 let portal = session
110 .get_portal_unverified(portal_name)
111 .expect("called verify_portal above");
114 let params = portal.parameters.clone();
115 let stmt = portal.stmt.clone();
116 let logging = Arc::clone(&portal.logging);
117 let lifecycle_timestamps = portal.lifecycle_timestamps.clone();
118 (stmt, params, logging, lifecycle_timestamps)
119 };
120
121 if let Some(ref stmt) = stmt {
124 match &**stmt {
125 Statement::Select(_)
126 | Statement::ExplainAnalyzeObject(_)
127 | Statement::ExplainAnalyzeCluster(_)
128 | Statement::Show(ShowStatement::ShowObjects(_))
129 | Statement::Show(ShowStatement::ShowColumns(_)) => {
130 }
135 Statement::ExplainPlan(explain_stmt) => {
136 match &explain_stmt.explainee {
141 mz_sql_parser::ast::Explainee::Select(..) => {
142 }
144 _ => {
145 debug!(
146 "Bailing out from try_frontend_peek, because EXPLAIN is not for a SELECT query"
147 );
148 return Ok(None);
149 }
150 }
151 }
152 Statement::ExplainPushdown(explain_stmt) => {
153 match &explain_stmt.explainee {
155 mz_sql_parser::ast::Explainee::Select(_, false) => {}
156 _ => {
157 debug!(
158 "Bailing out from try_frontend_peek, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is for EXPLAIN BROKEN"
159 );
160 return Ok(None);
161 }
162 }
163 }
164 Statement::Copy(copy_stmt) => {
165 match ©_stmt.direction {
166 CopyDirection::To => {
167 if matches!(©_stmt.relation, CopyRelation::Subscribe(_)) {
169 debug!(
170 "Bailing out from try_frontend_peek, because COPY (SUBSCRIBE ...) TO is not supported"
171 );
172 return Ok(None);
173 }
174 }
176 CopyDirection::From => {
177 debug!(
178 "Bailing out from try_frontend_peek, because COPY FROM is not supported"
179 );
180 return Ok(None);
181 }
182 }
183 }
184 _ => {
185 debug!(
186 "Bailing out from try_frontend_peek, because statement type is not supported"
187 );
188 return Ok(None);
189 }
190 }
191 }
192
193 let statement_logging_id = if outer_ctx_extra.is_none() {
196 let result = self.statement_logging_frontend.begin_statement_execution(
198 session,
199 ¶ms,
200 &logging,
201 catalog.system_config(),
202 lifecycle_timestamps,
203 );
204
205 if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result {
206 self.log_began_execution(began_execution, mseh_update, prepared_statement);
207 Some(logging_id)
208 } else {
209 None
210 }
211 } else {
212 outer_ctx_extra
219 .take()
220 .and_then(|guard| guard.defuse().retire())
221 };
222
223 let result = self
224 .try_frontend_peek_inner(session, catalog, stmt, params, statement_logging_id)
225 .await;
226
227 if let Some(logging_id) = statement_logging_id {
230 let reason = match &result {
231 Ok(Some(ExecuteResponse::SendingRowsStreaming { .. })) => {
233 return result;
236 }
237 Ok(Some(resp @ ExecuteResponse::CopyTo { resp: inner, .. })) => {
239 match inner.as_ref() {
240 ExecuteResponse::SendingRowsStreaming { .. } => {
241 return result;
244 }
245 _ => resp.into(),
247 }
248 }
249 Ok(None) => {
251 soft_panic_or_log!(
252 "Bailed out from `try_frontend_peek_inner` after we already logged the beginning of statement execution."
253 );
254 self.log_ended_execution(
257 logging_id,
258 StatementEndedExecutionReason::Errored {
259 error: "Internal error: bailed out from `try_frontend_peek_inner`"
260 .to_string(),
261 },
262 );
263 return result;
264 }
265 Ok(Some(resp)) => resp.into(),
270 Err(e) => StatementEndedExecutionReason::Errored {
271 error: e.to_string(),
272 },
273 };
274
275 self.log_ended_execution(logging_id, reason);
276 }
277
278 result
279 }
280
281 async fn try_frontend_peek_inner(
284 &mut self,
285 session: &mut Session,
286 catalog: Arc<Catalog>,
287 stmt: Option<Arc<Statement<Raw>>>,
288 params: Params,
289 statement_logging_id: Option<crate::statement_logging::StatementLoggingId>,
290 ) -> Result<Option<ExecuteResponse>, AdapterError> {
291 let stmt = match stmt {
292 Some(stmt) => stmt,
293 None => {
294 debug!("try_frontend_peek_inner succeeded on an empty query");
295 return Ok(Some(ExecuteResponse::EmptyQuery));
296 }
297 };
298
299 session
300 .metrics()
301 .query_total(&[
302 metrics::session_type_label_value(session.user()),
303 metrics::statement_type_label_value(&stmt),
304 ])
305 .inc();
306
307 let conn_catalog = catalog.for_session(session);
310 let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?;
313
314 let pcx = session.pcx();
315 let plan = mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, ¶ms, &resolved_ids)?;
316
317 let (select_plan, explain_ctx, copy_to_ctx) = match &plan {
318 Plan::Select(select_plan) => {
319 let explain_ctx = if session.vars().emit_plan_insights_notice() {
320 let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
321 ExplainContext::PlanInsightsNotice(optimizer_trace)
322 } else {
323 ExplainContext::None
324 };
325 (select_plan, explain_ctx, None)
326 }
327 Plan::ShowColumns(show_columns_plan) => {
328 (&show_columns_plan.select_plan, ExplainContext::None, None)
330 }
331 Plan::ExplainPlan(plan::ExplainPlanPlan {
332 stage,
333 format,
334 config,
335 explainee:
336 plan::Explainee::Statement(plan::ExplaineeStatement::Select { broken, plan, desc }),
337 }) => {
338 let optimizer_trace = OptimizerTrace::new(stage.paths());
340 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
341 broken: *broken,
342 config: config.clone(),
343 format: *format,
344 stage: *stage,
345 replan: None,
346 desc: Some(desc.clone()),
347 optimizer_trace,
348 });
349 (plan, explain_ctx, None)
350 }
351 Plan::CopyTo(plan::CopyToPlan {
353 select_plan,
354 desc,
355 to,
356 connection,
357 connection_id,
358 format,
359 max_file_size,
360 }) => {
361 let uri = eval_copy_to_uri(to.clone(), session, catalog.state())?;
362
363 let copy_to_ctx = CopyToContext {
365 desc: desc.clone(),
366 uri,
367 connection: connection.clone(),
368 connection_id: *connection_id,
369 format: format.clone(),
370 max_file_size: *max_file_size,
371 output_batch_count: None,
372 };
373
374 (select_plan, ExplainContext::None, Some(copy_to_ctx))
375 }
376 Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => {
377 match explainee {
379 plan::Explainee::Statement(plan::ExplaineeStatement::Select {
380 broken: false,
381 plan,
382 desc: _,
383 }) => {
384 let explain_ctx = ExplainContext::Pushdown;
385 (plan, explain_ctx, None)
386 }
387 _ => {
388 soft_panic_or_log!(
391 "unexpected EXPLAIN FILTER PUSHDOWN plan kind in frontend peek sequencing: {:?}",
392 explainee
393 );
394 debug!(
395 "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is EXPLAIN BROKEN"
396 );
397 return Ok(None);
398 }
399 }
400 }
401 Plan::SideEffectingFunc(sef_plan) => {
402 let response = self
406 .call_coordinator(|tx| Command::ExecuteSideEffectingFunc {
407 plan: sef_plan.clone(),
408 conn_id: session.conn_id().clone(),
409 current_role: session.role_metadata().current_role,
410 tx,
411 })
412 .await?;
413 return Ok(Some(response));
414 }
415 _ => {
416 soft_panic_or_log!(
419 "Unexpected plan kind in frontend peek sequencing: {:?}",
420 plan
421 );
422 debug!(
423 "Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, side-effecting SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO S3"
424 );
425 return Ok(None);
426 }
427 };
428
429 assert!(plan.allowed_in_read_only());
433
434 let target_cluster = match session.transaction().cluster() {
435 Some(cluster_id) => TargetCluster::Transaction(cluster_id),
437 None => {
439 coord::catalog_serving::auto_run_on_catalog_server(&conn_catalog, session, &plan)
440 }
441 };
442 let (cluster, target_cluster_id, target_cluster_name) = {
443 let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
444 (cluster, cluster.id, &cluster.name)
445 };
446
447 if let Some(logging_id) = &statement_logging_id {
449 self.log_set_cluster(*logging_id, target_cluster_id);
450 }
451
452 coord::catalog_serving::check_cluster_restrictions(
453 target_cluster_name.as_str(),
454 &conn_catalog,
455 &plan,
456 )?;
457
458 rbac::check_plan(
459 &conn_catalog,
460 None::<fn(u32) -> Option<RoleId>>,
463 session,
464 &plan,
465 Some(target_cluster_id),
466 &resolved_ids,
467 )?;
468
469 if let Some((_, wait_future)) =
470 coord::appends::waiting_on_startup_appends(&*catalog, session, &plan)
471 {
472 wait_future.await;
473 }
474
475 let max_query_result_size = Some(session.vars().max_query_result_size());
476
477 let compute_instance_snapshot =
482 ComputeInstanceSnapshot::new_without_collections(cluster.id());
483
484 let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config())
485 .override_from(&catalog.get_cluster(cluster.id()).config.features())
486 .override_from(&explain_ctx);
487
488 if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
489 return Err(AdapterError::NoClusterReplicasAvailable {
490 name: cluster.name.clone(),
491 is_managed: cluster.is_managed(),
492 });
493 }
494
495 let (_, view_id) = self.transient_id_gen.allocate_id();
496 let (_, index_id) = self.transient_id_gen.allocate_id();
497
498 let mut optimizer = if let Some(mut copy_to_ctx) = copy_to_ctx {
499 let worker_counts = cluster.replicas().map(|r| {
501 let loc = &r.config.location;
502 loc.workers().unwrap_or_else(|| loc.num_processes())
503 });
504 let max_worker_count = match worker_counts.max() {
505 Some(count) => u64::cast_from(count),
506 None => {
507 return Err(AdapterError::NoClusterReplicasAvailable {
508 name: cluster.name.clone(),
509 is_managed: cluster.is_managed(),
510 });
511 }
512 };
513 copy_to_ctx.output_batch_count = Some(max_worker_count);
514
515 Either::Right(optimize::copy_to::Optimizer::new(
516 Arc::clone(&catalog),
517 compute_instance_snapshot.clone(),
518 view_id,
519 copy_to_ctx,
520 optimizer_config,
521 self.optimizer_metrics.clone(),
522 ))
523 } else {
524 Either::Left(optimize::peek::Optimizer::new(
526 Arc::clone(&catalog),
527 compute_instance_snapshot.clone(),
528 select_plan.finishing.clone(),
529 view_id,
530 index_id,
531 optimizer_config,
532 self.optimizer_metrics.clone(),
533 ))
534 };
535
536 let target_replica_name = session.vars().cluster_replica();
537 let mut target_replica = target_replica_name
538 .map(|name| {
539 cluster
540 .replica_id(name)
541 .ok_or(AdapterError::UnknownClusterReplica {
542 cluster_name: cluster.name.clone(),
543 replica_name: name.to_string(),
544 })
545 })
546 .transpose()?;
547
548 let source_ids = select_plan.source.depends_on();
549 let mut timeline_context = catalog.validate_timeline_context(source_ids.iter().copied())?;
553 if matches!(timeline_context, TimelineContext::TimestampIndependent)
554 && select_plan.source.contains_temporal()?
555 {
556 timeline_context = TimelineContext::TimestampDependent;
560 }
561
562 let notices = coord::sequencer::check_log_reads(
563 &catalog,
564 cluster,
565 &source_ids,
566 &mut target_replica,
567 session.vars(),
568 )?;
569 session.add_notices(notices);
570
571 let isolation_level = session.vars().transaction_isolation().clone();
574 let timeline = Coordinator::get_timeline(&timeline_context);
575 let needs_linearized_read_ts =
576 Coordinator::needs_linearized_read_ts(&isolation_level, &select_plan.when);
577
578 let oracle_read_ts = match timeline {
579 Some(timeline) if needs_linearized_read_ts => {
580 let oracle = self.ensure_oracle(timeline).await?;
581 let oracle_read_ts = oracle.read_ts().await;
582 Some(oracle_read_ts)
583 }
584 Some(_) | None => None,
585 };
586
587 let vars = session.vars();
590 let real_time_recency_ts: Option<Timestamp> = if vars.real_time_recency()
591 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
592 && !session.contains_read_timestamp()
593 {
594 self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp {
596 source_ids: source_ids.clone(),
597 real_time_recency_timeout: *vars.real_time_recency_timeout(),
598 tx,
599 })
600 .await?
601 } else {
602 None
603 };
604
605 let dataflow_builder = DataflowBuilder::new(catalog.state(), compute_instance_snapshot);
608 let input_id_bundle = dataflow_builder.sufficient_collections(source_ids.clone());
609
610 let in_immediate_multi_stmt_txn = session
623 .transaction()
624 .in_immediate_multi_stmt_txn(&select_plan.when);
625
626 let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
628 Some(
631 determination @ TimestampDetermination {
632 timestamp_context: TimestampContext::TimelineTimestamp { .. },
633 ..
634 },
635 ) if in_immediate_multi_stmt_txn => {
636 let txn_read_holds_opt = self
644 .call_coordinator(|tx| Command::GetTransactionReadHoldsBundle {
645 conn_id: session.conn_id().clone(),
646 tx,
647 })
648 .await;
649
650 if let Some(txn_read_holds) = txn_read_holds_opt {
651 let allowed_id_bundle = txn_read_holds.id_bundle();
652 let outside = input_id_bundle.difference(&allowed_id_bundle);
653
654 if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
656 let valid_names =
657 allowed_id_bundle.resolve_names(&*catalog, session.conn_id());
658 let invalid_names = outside.resolve_names(&*catalog, session.conn_id());
659 return Err(AdapterError::RelationOutsideTimeDomain {
660 relations: invalid_names,
661 names: valid_names,
662 });
663 }
664
665 let read_holds = txn_read_holds.subset(&input_id_bundle);
667
668 (determination, read_holds)
669 } else {
670 return Err(AdapterError::Internal(
675 "Missing transaction read holds for multi-statement transaction"
676 .to_string(),
677 ));
678 }
679 }
680 _ => {
681 let timedomain_bundle;
688 let determine_bundle = if in_immediate_multi_stmt_txn {
689 timedomain_bundle = timedomain_for(
693 &*catalog,
694 &dataflow_builder,
695 &source_ids,
696 &timeline_context,
697 session.conn_id(),
698 target_cluster_id,
699 )?;
700 &timedomain_bundle
701 } else {
702 &input_id_bundle
704 };
705 let (determination, read_holds) = self
706 .frontend_determine_timestamp(
707 catalog.state(),
708 session,
709 determine_bundle,
710 &select_plan.when,
711 target_cluster_id,
712 &timeline_context,
713 oracle_read_ts,
714 real_time_recency_ts,
715 )
716 .await?;
717
718 if in_immediate_multi_stmt_txn {
722 self.call_coordinator(|tx| Command::StoreTransactionReadHolds {
723 conn_id: session.conn_id().clone(),
724 read_holds: read_holds.clone(),
725 tx,
726 })
727 .await;
728 }
729
730 (determination, read_holds)
731 }
732 };
733
734 {
735 for id in input_id_bundle.iter() {
737 let s = read_holds.storage_holds.contains_key(&id);
738 let c = read_holds
739 .compute_ids()
740 .map(|(_instance, coll)| coll)
741 .contains(&id);
742 soft_assert_or_log!(
743 s || c,
744 "missing read hold for collection {} in `input_id_bundle",
745 id
746 );
747 }
748
749 for id in input_id_bundle.storage_ids.iter() {
752 soft_assert_or_log!(
753 read_holds.storage_holds.contains_key(id),
754 "missing storage read hold for collection {} in `input_id_bundle",
755 id
756 );
757 }
758 for id in input_id_bundle
759 .compute_ids
760 .iter()
761 .flat_map(|(_instance, colls)| colls)
762 {
763 soft_assert_or_log!(
764 read_holds
765 .compute_ids()
766 .map(|(_instance, coll)| coll)
767 .contains(id),
768 "missing compute read hold for collection {} in `input_id_bundle",
769 id,
770 );
771 }
772 }
773
774 let requires_linearization = (&explain_ctx).into();
787 let mut transaction_determination = determination.clone();
788 if select_plan.when.is_transactional() {
789 session.add_transaction_ops(TransactionOps::Peeks {
790 determination: transaction_determination,
791 cluster_id: target_cluster_id,
792 requires_linearization,
793 })?;
794 } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
795 transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
797 session.add_transaction_ops(TransactionOps::Peeks {
798 determination: transaction_determination,
799 cluster_id: target_cluster_id,
800 requires_linearization,
801 })?;
802 };
803
804 let stats = statistics_oracle(
807 session,
808 &source_ids,
809 &determination.timestamp_context.antichain(),
810 true,
811 catalog.system_config(),
812 &*self.storage_collections,
813 )
814 .await
815 .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
816
817 let timestamp_context = determination.timestamp_context.clone();
820 let session_meta = session.meta();
821 let now = catalog.config().now.clone();
822 let select_plan = select_plan.clone();
823 let target_cluster_name = target_cluster_name.clone();
824 let needs_plan_insights = explain_ctx.needs_plan_insights();
825 let determination_for_pushdown = if matches!(explain_ctx, ExplainContext::Pushdown) {
826 Some(determination.clone())
829 } else {
830 None
831 };
832
833 let span = Span::current();
834
835 let catalog_for_insights = if needs_plan_insights {
837 Some(Arc::clone(&catalog))
838 } else {
839 None
840 };
841 let mut compute_instances = BTreeMap::new();
842 if needs_plan_insights {
843 for user_cluster in catalog.user_clusters() {
844 let snapshot = ComputeInstanceSnapshot::new_without_collections(user_cluster.id);
845 compute_instances.insert(user_cluster.name.clone(), snapshot);
846 }
847 }
848
849 let source_ids_for_closure = source_ids.clone();
850 let optimization_future = mz_ore::task::spawn_blocking(
851 || "optimize peek",
852 move || {
853 span.in_scope(|| {
854 let _dispatch_guard = explain_ctx.dispatch_guard();
855
856 let raw_expr = select_plan.source.clone();
857
858 let pipeline = || -> Result<Either<optimize::peek::GlobalLirPlan, optimize::copy_to::GlobalLirPlan>, OptimizerError> {
862 match optimizer.as_mut() {
863 Either::Left(optimizer) => {
864 let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr.clone())?;
867 let local_mir_plan =
869 local_mir_plan.resolve(timestamp_context.clone(), &session_meta, stats);
870 let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
872 Ok(Either::Left(global_lir_plan))
873 }
874 Either::Right(optimizer) => {
875 let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr.clone())?;
878 let local_mir_plan =
880 local_mir_plan.resolve(timestamp_context.clone(), &session_meta, stats);
881 let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
883 Ok(Either::Right(global_lir_plan))
884 }
885 }
886 };
887
888 let global_lir_plan_result = pipeline();
889 let optimization_finished_at = now();
890
891 let create_insights_ctx = |optimizer: &optimize::peek::Optimizer, is_notice: bool| -> Option<Box<PlanInsightsContext>> {
892 if !needs_plan_insights {
893 return None;
894 }
895
896 let catalog = catalog_for_insights.as_ref()?;
897
898 let enable_re_optimize = if needs_plan_insights {
899 let opt_limit = mz_adapter_types::dyncfgs::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
907 .get(catalog.system_config().dyncfgs());
908 !(is_notice && optimizer.duration() > opt_limit)
909 } else {
910 false
911 };
912
913 Some(Box::new(PlanInsightsContext {
914 stmt: select_plan.select.as_deref().map(Clone::clone).map(Statement::Select),
915 raw_expr: raw_expr.clone(),
916 catalog: Arc::clone(catalog),
917 compute_instances,
918 target_instance: target_cluster_name,
919 metrics: optimizer.metrics().clone(),
920 finishing: optimizer.finishing().clone(),
921 optimizer_config: optimizer.config().clone(),
922 session: session_meta,
923 timestamp_context,
924 view_id: optimizer.select_id(),
925 index_id: optimizer.index_id(),
926 enable_re_optimize,
927 }))
928 };
929
930 match global_lir_plan_result {
931 Ok(Either::Left(global_lir_plan)) => {
932 let optimizer = optimizer.unwrap_left();
934 match explain_ctx {
935 ExplainContext::Plan(explain_ctx) => {
936 let (_, df_meta, _) = global_lir_plan.unapply();
937 let insights_ctx = create_insights_ctx(&optimizer, false);
938 Ok(Execution::ExplainPlan {
939 df_meta,
940 explain_ctx,
941 optimizer,
942 insights_ctx,
943 })
944 }
945 ExplainContext::None => {
946 Ok(Execution::Peek {
947 global_lir_plan,
948 optimization_finished_at,
949 plan_insights_optimizer_trace: None,
950 insights_ctx: None,
951 })
952 }
953 ExplainContext::PlanInsightsNotice(optimizer_trace) => {
954 let insights_ctx = create_insights_ctx(&optimizer, true);
955 Ok(Execution::Peek {
956 global_lir_plan,
957 optimization_finished_at,
958 plan_insights_optimizer_trace: Some(optimizer_trace),
959 insights_ctx,
960 })
961 }
962 ExplainContext::Pushdown => {
963 let (plan, _, _) = global_lir_plan.unapply();
964 let imports = match plan {
965 PeekPlan::SlowPath(plan) => plan
966 .desc
967 .source_imports
968 .into_iter()
969 .filter_map(|(id, import)| import.desc.arguments.operators.map(|mfp| (id, mfp)))
970 .collect(),
971 PeekPlan::FastPath(_) => std::collections::BTreeMap::default(),
972 };
973 Ok(Execution::ExplainPushdown {
974 imports,
975 determination: determination_for_pushdown.expect("it's present for the ExplainPushdown case"),
976 })
977 }
978 }
979 }
980 Ok(Either::Right(global_lir_plan)) => {
981 Ok(Execution::CopyToS3 {
983 global_lir_plan,
984 source_ids: source_ids_for_closure,
985 })
986 }
987 Err(err) => {
988 if optimizer.is_right() {
989 return Err(err);
991 }
992 let optimizer = optimizer.expect_left("checked above");
994 if let ExplainContext::Plan(explain_ctx) = explain_ctx {
995 if explain_ctx.broken {
996 tracing::error!("error while handling EXPLAIN statement: {}", err);
998 Ok(Execution::ExplainPlan {
999 df_meta: Default::default(),
1000 explain_ctx,
1001 optimizer,
1002 insights_ctx: None,
1003 })
1004 } else {
1005 Err(err)
1006 }
1007 } else {
1008 Err(err)
1009 }
1010 }
1011 }
1012 })
1013 },
1014 );
1015 let optimization_timeout = *session.vars().statement_timeout();
1016 let optimization_result =
1017 match tokio::time::timeout(optimization_timeout, optimization_future).await {
1022 Ok(Ok(result)) => result,
1023 Ok(Err(optimizer_error)) => {
1024 return Err(AdapterError::Internal(format!(
1025 "internal error in optimizer: {}",
1026 optimizer_error
1027 )));
1028 }
1029 Err(_elapsed) => {
1030 warn!("optimize peek timed out after {:?}", optimization_timeout);
1031 return Err(AdapterError::StatementTimeout);
1032 }
1033 };
1034
1035 if let Some(logging_id) = &statement_logging_id {
1037 self.log_lifecycle_event(*logging_id, StatementLifecycleEvent::OptimizationFinished);
1038 }
1039
1040 Self::assert_read_holds_correct(
1042 &read_holds,
1043 &optimization_result,
1044 &determination,
1045 target_cluster_id,
1046 );
1047
1048 match optimization_result {
1050 Execution::ExplainPlan {
1051 df_meta,
1052 explain_ctx,
1053 optimizer,
1054 insights_ctx,
1055 } => {
1056 let rows = coord::sequencer::explain_plan_inner(
1057 session,
1058 &catalog,
1059 df_meta,
1060 explain_ctx,
1061 optimizer,
1062 insights_ctx,
1063 )
1064 .await?;
1065
1066 Ok(Some(ExecuteResponse::SendingRowsImmediate {
1067 rows: Box::new(rows.into_row_iter()),
1068 }))
1069 }
1070 Execution::ExplainPushdown {
1071 imports,
1072 determination,
1073 } => {
1074 let as_of = determination.timestamp_context.antichain();
1077 let mz_now = determination
1078 .timestamp_context
1079 .timestamp()
1080 .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
1081 .unwrap_or_else(ResultSpec::value_all);
1082
1083 Ok(Some(
1084 coord::sequencer::explain_pushdown_future_inner(
1085 session,
1086 &*catalog,
1087 &self.storage_collections,
1088 as_of,
1089 mz_now,
1090 imports,
1091 )
1092 .await
1093 .await?,
1094 ))
1095 }
1096 Execution::Peek {
1097 global_lir_plan,
1098 optimization_finished_at: _optimization_finished_at,
1099 plan_insights_optimizer_trace,
1100 insights_ctx,
1101 } => {
1102 let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
1107
1108 coord::sequencer::emit_optimizer_notices(
1109 &*catalog,
1110 session,
1111 &df_meta.optimizer_notices,
1112 );
1113
1114 if let Some(trace) = plan_insights_optimizer_trace {
1116 let target_cluster = catalog.get_cluster(target_cluster_id);
1117 let features = OptimizerFeatures::from(catalog.system_config())
1118 .override_from(&target_cluster.config.features());
1119 let insights = trace
1120 .into_plan_insights(
1121 &features,
1122 &catalog.for_session(session),
1123 Some(select_plan.finishing.clone()),
1124 Some(target_cluster),
1125 df_meta.clone(),
1126 insights_ctx,
1127 )
1128 .await?;
1129 session.add_notice(AdapterNotice::PlanInsights(insights));
1130 }
1131
1132 let watch_set = statement_logging_id.map(|logging_id| {
1135 WatchSetCreation::new(
1136 logging_id,
1137 catalog.state(),
1138 &input_id_bundle,
1139 determination.timestamp_context.timestamp_or_default(),
1140 )
1141 });
1142
1143 let max_result_size = catalog.system_config().max_result_size();
1144
1145 let determination_for_notice = if session.vars().emit_timestamp_notice() {
1148 Some(determination.clone())
1149 } else {
1150 None
1151 };
1152
1153 let response = match peek_plan {
1154 PeekPlan::FastPath(fast_path_plan) => {
1155 if let Some(logging_id) = &statement_logging_id {
1156 if !matches!(fast_path_plan, FastPathPlan::Constant(..)) {
1170 self.log_set_timestamp(
1171 *logging_id,
1172 determination.timestamp_context.timestamp_or_default(),
1173 );
1174 }
1175 }
1176
1177 let row_set_finishing_seconds =
1178 session.metrics().row_set_finishing_seconds().clone();
1179
1180 let peek_stash_read_batch_size_bytes =
1181 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
1182 .get(catalog.system_config().dyncfgs());
1183 let peek_stash_read_memory_budget_bytes =
1184 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
1185 .get(catalog.system_config().dyncfgs());
1186
1187 self.implement_fast_path_peek_plan(
1188 fast_path_plan,
1189 determination.timestamp_context.timestamp_or_default(),
1190 select_plan.finishing,
1191 target_cluster_id,
1192 target_replica,
1193 typ,
1194 max_result_size,
1195 max_query_result_size,
1196 row_set_finishing_seconds,
1197 read_holds,
1198 peek_stash_read_batch_size_bytes,
1199 peek_stash_read_memory_budget_bytes,
1200 session.conn_id().clone(),
1201 source_ids,
1202 watch_set,
1203 )
1204 .await?
1205 }
1206 PeekPlan::SlowPath(dataflow_plan) => {
1207 if let Some(logging_id) = &statement_logging_id {
1208 self.log_set_transient_index_id(*logging_id, dataflow_plan.id);
1209 }
1210
1211 self.call_coordinator(|tx| Command::ExecuteSlowPathPeek {
1212 dataflow_plan: Box::new(dataflow_plan),
1213 determination,
1214 finishing: select_plan.finishing,
1215 compute_instance: target_cluster_id,
1216 target_replica,
1217 intermediate_result_type: typ,
1218 source_ids,
1219 conn_id: session.conn_id().clone(),
1220 max_result_size,
1221 max_query_result_size,
1222 watch_set,
1223 tx,
1224 })
1225 .await?
1226 }
1227 };
1228
1229 if let Some(determination) = determination_for_notice {
1231 let explanation = self
1232 .call_coordinator(|tx| Command::ExplainTimestamp {
1233 conn_id: session.conn_id().clone(),
1234 session_wall_time: session.pcx().wall_time,
1235 cluster_id: target_cluster_id,
1236 id_bundle: input_id_bundle.clone(),
1237 determination,
1238 tx,
1239 })
1240 .await;
1241 session.add_notice(AdapterNotice::QueryTimestamp { explanation });
1242 }
1243
1244 Ok(Some(match select_plan.copy_to {
1245 None => response,
1246 Some(format) => ExecuteResponse::CopyTo {
1248 format,
1249 resp: Box::new(response),
1250 },
1251 }))
1252 }
1253 Execution::CopyToS3 {
1254 global_lir_plan,
1255 source_ids,
1256 } => {
1257 let (df_desc, df_meta) = global_lir_plan.unapply();
1258
1259 coord::sequencer::emit_optimizer_notices(
1260 &*catalog,
1261 session,
1262 &df_meta.optimizer_notices,
1263 );
1264
1265 let sink_id = df_desc.sink_id();
1267 let sinks = &df_desc.sink_exports;
1268 if sinks.len() != 1 {
1269 return Err(AdapterError::Internal(
1270 "expected exactly one copy to s3 sink".into(),
1271 ));
1272 }
1273 let (_, sink_desc) = sinks
1274 .first_key_value()
1275 .expect("known to be exactly one copy to s3 sink");
1276 let s3_sink_connection = match &sink_desc.connection {
1277 mz_compute_types::sinks::ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1278 conn.clone()
1279 }
1280 _ => {
1281 return Err(AdapterError::Internal(
1282 "expected copy to s3 oneshot sink".into(),
1283 ));
1284 }
1285 };
1286
1287 self.call_coordinator(|tx| Command::CopyToPreflight {
1290 s3_sink_connection,
1291 sink_id,
1292 tx,
1293 })
1294 .await?;
1295
1296 let watch_set = statement_logging_id.map(|logging_id| {
1298 WatchSetCreation::new(
1299 logging_id,
1300 catalog.state(),
1301 &input_id_bundle,
1302 determination.timestamp_context.timestamp_or_default(),
1303 )
1304 });
1305
1306 let response = self
1307 .call_coordinator(|tx| Command::ExecuteCopyTo {
1308 df_desc: Box::new(df_desc),
1309 compute_instance: target_cluster_id,
1310 target_replica,
1311 source_ids,
1312 conn_id: session.conn_id().clone(),
1313 watch_set,
1314 tx,
1315 })
1316 .await?;
1317
1318 Ok(Some(response))
1319 }
1320 }
1321 }
1322
1323 pub(crate) async fn frontend_determine_timestamp(
1330 &mut self,
1331 catalog_state: &CatalogState,
1332 session: &Session,
1333 id_bundle: &CollectionIdBundle,
1334 when: &QueryWhen,
1335 compute_instance: ComputeInstanceId,
1336 timeline_context: &TimelineContext,
1337 oracle_read_ts: Option<Timestamp>,
1338 real_time_recency_ts: Option<Timestamp>,
1339 ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
1340 let constraint_based = ConstraintBasedTimestampSelection::from_str(
1343 &CONSTRAINT_BASED_TIMESTAMP_SELECTION.get(catalog_state.system_config().dyncfgs()),
1344 );
1345
1346 let isolation_level = session.vars().transaction_isolation();
1347
1348 let (read_holds, upper) = self
1349 .acquire_read_holds_and_least_valid_write(id_bundle)
1350 .await
1351 .map_err(|err| {
1352 AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1353 err,
1354 compute_instance,
1355 )
1356 })?;
1357 let (det, read_holds) = <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1358 session,
1359 id_bundle,
1360 when,
1361 compute_instance,
1362 timeline_context,
1363 oracle_read_ts,
1364 real_time_recency_ts,
1365 isolation_level,
1366 &constraint_based,
1367 read_holds,
1368 upper.clone(),
1369 )?;
1370
1371 session
1372 .metrics()
1373 .determine_timestamp(&[
1374 match det.respond_immediately() {
1375 true => "true",
1376 false => "false",
1377 },
1378 isolation_level.as_str(),
1379 &compute_instance.to_string(),
1380 constraint_based.as_str(),
1381 ])
1382 .inc();
1383 if !det.respond_immediately()
1384 && isolation_level == &IsolationLevel::StrictSerializable
1385 && real_time_recency_ts.is_none()
1386 {
1387 if let Some(strict) = det.timestamp_context.timestamp() {
1389 let (serializable_det, _tmp_read_holds) =
1390 <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1391 session,
1392 id_bundle,
1393 when,
1394 compute_instance,
1395 timeline_context,
1396 oracle_read_ts,
1397 real_time_recency_ts,
1398 isolation_level,
1399 &constraint_based,
1400 read_holds.clone(),
1401 upper,
1402 )?;
1403 if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
1404 session
1405 .metrics()
1406 .timestamp_difference_for_strict_serializable_ms(&[
1407 compute_instance.to_string().as_ref(),
1408 constraint_based.as_str(),
1409 ])
1410 .observe(f64::cast_lossy(u64::from(
1411 strict.saturating_sub(*serializable),
1412 )));
1413 }
1414 }
1415 }
1416
1417 Ok((det, read_holds))
1418 }
1419
1420 fn assert_read_holds_correct(
1421 read_holds: &ReadHolds<Timestamp>,
1422 execution: &Execution,
1423 determination: &TimestampDetermination<Timestamp>,
1424 target_cluster_id: ClusterId,
1425 ) {
1426 let (source_imports, index_imports, as_of, execution_name): (
1428 Vec<GlobalId>,
1429 Vec<GlobalId>,
1430 Timestamp,
1431 &str,
1432 ) = match execution {
1433 Execution::Peek {
1434 global_lir_plan, ..
1435 } => match global_lir_plan.peek_plan() {
1436 PeekPlan::FastPath(fast_path_plan) => {
1437 let (sources, indexes) = match fast_path_plan {
1438 FastPathPlan::Constant(..) => (vec![], vec![]),
1439 FastPathPlan::PeekExisting(_coll_id, idx_id, ..) => (vec![], vec![*idx_id]),
1440 FastPathPlan::PeekPersist(global_id, ..) => (vec![*global_id], vec![]),
1441 };
1442 (
1443 sources,
1444 indexes,
1445 determination.timestamp_context.timestamp_or_default(),
1446 "FastPath",
1447 )
1448 }
1449 PeekPlan::SlowPath(dataflow_plan) => {
1450 let as_of = dataflow_plan
1451 .desc
1452 .as_of
1453 .clone()
1454 .expect("dataflow has an as_of")
1455 .into_element();
1456 (
1457 dataflow_plan.desc.source_imports.keys().cloned().collect(),
1458 dataflow_plan.desc.index_imports.keys().cloned().collect(),
1459 as_of,
1460 "SlowPath",
1461 )
1462 }
1463 },
1464 Execution::CopyToS3 {
1465 global_lir_plan, ..
1466 } => {
1467 let df_desc = global_lir_plan.df_desc();
1468 let as_of = df_desc
1469 .as_of
1470 .clone()
1471 .expect("dataflow has an as_of")
1472 .into_element();
1473 (
1474 df_desc.source_imports.keys().cloned().collect(),
1475 df_desc.index_imports.keys().cloned().collect(),
1476 as_of,
1477 "CopyToS3",
1478 )
1479 }
1480 Execution::ExplainPlan { .. } | Execution::ExplainPushdown { .. } => {
1481 return;
1483 }
1484 };
1485
1486 for id in source_imports.iter() {
1488 soft_assert_or_log!(
1489 read_holds.storage_holds.contains_key(id),
1490 "[{}] missing read hold for the source import {}",
1491 execution_name,
1492 id
1493 );
1494 }
1495 for id in index_imports.iter() {
1496 soft_assert_or_log!(
1497 read_holds
1498 .compute_ids()
1499 .map(|(_instance, coll)| coll)
1500 .contains(id),
1501 "[{}] missing read hold for the index import {}",
1502 execution_name,
1503 id,
1504 );
1505 }
1506
1507 for (id, h) in read_holds.storage_holds.iter() {
1509 soft_assert_or_log!(
1510 h.since().less_equal(&as_of),
1511 "[{}] storage read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}",
1512 execution_name,
1513 h.since(),
1514 id,
1515 as_of,
1516 determination
1517 );
1518 }
1519 for ((instance, id), h) in read_holds.compute_holds.iter() {
1520 soft_assert_eq_or_log!(
1521 *instance,
1522 target_cluster_id,
1523 "[{}] the read hold on {} is on the wrong cluster",
1524 execution_name,
1525 id
1526 );
1527 soft_assert_or_log!(
1528 h.since().less_equal(&as_of),
1529 "[{}] compute read hold at {:?} for collection {} is not enough for as_of {:?}, determination: {:?}",
1530 execution_name,
1531 h.since(),
1532 id,
1533 as_of,
1534 determination
1535 );
1536 }
1537 }
1538}
1539
1540enum Execution {
1542 Peek {
1543 global_lir_plan: optimize::peek::GlobalLirPlan,
1544 optimization_finished_at: EpochMillis,
1545 plan_insights_optimizer_trace: Option<OptimizerTrace>,
1546 insights_ctx: Option<Box<PlanInsightsContext>>,
1547 },
1548 CopyToS3 {
1549 global_lir_plan: optimize::copy_to::GlobalLirPlan,
1550 source_ids: BTreeSet<GlobalId>,
1551 },
1552 ExplainPlan {
1553 df_meta: DataflowMetainfo,
1554 explain_ctx: ExplainPlanContext,
1555 optimizer: optimize::peek::Optimizer,
1556 insights_ctx: Option<Box<PlanInsightsContext>>,
1557 },
1558 ExplainPushdown {
1559 imports: BTreeMap<GlobalId, mz_expr::MapFilterProject>,
1560 determination: TimestampDetermination<Timestamp>,
1561 },
1562}