1use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::sync::Arc;
13
14use itertools::Either;
15use mz_adapter_types::dyncfgs::CONSTRAINT_BASED_TIMESTAMP_SELECTION;
16use mz_adapter_types::timestamp_selection::ConstraintBasedTimestampSelection;
17use mz_compute_types::ComputeInstanceId;
18use mz_expr::{CollectionPlan, ResultSpec};
19use mz_ore::cast::{CastFrom, CastLossy};
20use mz_ore::now::EpochMillis;
21use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
22use mz_repr::role_id::RoleId;
23use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
24use mz_sql::catalog::CatalogCluster;
25use mz_sql::plan::{self, Plan, QueryWhen};
26use mz_sql::rbac;
27use mz_sql::session::metadata::SessionMetadata;
28use mz_sql::session::vars::IsolationLevel;
29use mz_sql_parser::ast::{CopyDirection, ExplainStage, Statement};
30use mz_transform::EmptyStatisticsOracle;
31use mz_transform::dataflow::DataflowMetainfo;
32use opentelemetry::trace::TraceContextExt;
33use tracing::{Span, debug};
34use tracing_opentelemetry::OpenTelemetrySpanExt;
35
36use crate::catalog::CatalogState;
37use crate::command::Command;
38use crate::coord::peek::PeekPlan;
39use crate::coord::sequencer::{eval_copy_to_uri, statistics_oracle};
40use crate::coord::timeline::timedomain_for;
41use crate::coord::timestamp_selection::TimestampDetermination;
42use crate::coord::{Coordinator, CopyToContext, ExplainContext, ExplainPlanContext, TargetCluster};
43use crate::explain::insights::PlanInsightsContext;
44use crate::explain::optimizer_trace::OptimizerTrace;
45use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder};
46use crate::optimize::{Optimize, OptimizerError};
47use crate::session::{Session, TransactionOps, TransactionStatus};
48use crate::{
49 AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, PeekClient, ReadHolds,
50 TimelineContext, TimestampContext, TimestampProvider, optimize,
51};
52use crate::{coord, metrics};
53
54impl PeekClient {
55 pub(crate) async fn try_frontend_peek_inner(
56 &mut self,
57 portal_name: &str,
58 session: &mut Session,
59 ) -> Result<Option<ExecuteResponse>, AdapterError> {
60 if session.vars().emit_timestamp_notice() {
61 debug!("Bailing out from try_frontend_peek_inner, because emit_timestamp_notice");
63 return Ok(None);
64 }
65
66 if session.vars().emit_trace_id_notice() {
69 let span_context = tracing::Span::current()
70 .context()
71 .span()
72 .span_context()
73 .clone();
74 if span_context.is_valid() {
75 session.add_notice(AdapterNotice::QueryTrace {
76 trace_id: span_context.trace_id(),
77 });
78 }
79 }
80
81 let catalog = self.catalog_snapshot("try_frontend_peek_inner").await;
88
89 if let Err(_) = Coordinator::verify_portal(&*catalog, session, portal_name) {
90 debug!(
92 "Bailing out from try_frontend_peek_inner, because verify_portal returned an error"
93 );
94 return Ok(None);
95 }
96
97 let (stmt, params) = {
99 let portal = session
100 .get_portal_unverified(portal_name)
101 .expect("called verify_portal above");
104 let params = portal.parameters.clone();
105 let stmt = portal.stmt.clone();
106 (stmt, params)
107 };
108
109 let stmt = match stmt {
110 Some(stmt) => stmt,
111 None => {
112 debug!("try_frontend_peek_inner succeeded on an empty query");
113 return Ok(Some(ExecuteResponse::EmptyQuery));
114 }
115 };
116
117 match &*stmt {
119 Statement::Select(_)
120 | Statement::ExplainAnalyzeObject(_)
121 | Statement::ExplainAnalyzeCluster(_) => {
122 }
125 Statement::ExplainPlan(explain_stmt) => {
126 match &explain_stmt.explainee {
131 mz_sql_parser::ast::Explainee::Select(..) => {
132 }
134 _ => {
135 debug!(
136 "Bailing out from try_frontend_peek_inner, because EXPLAIN is not for a SELECT query"
137 );
138 return Ok(None);
139 }
140 }
141 }
142 Statement::ExplainPushdown(explain_stmt) => {
143 match &explain_stmt.explainee {
145 mz_sql_parser::ast::Explainee::Select(..) => {}
146 _ => {
147 debug!(
148 "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query"
149 );
150 return Ok(None);
151 }
152 }
153 }
154 Statement::Copy(copy_stmt) => {
155 match ©_stmt.direction {
156 CopyDirection::To => {
157 }
159 CopyDirection::From => {
160 debug!(
161 "Bailing out from try_frontend_peek_inner, because COPY FROM is not supported"
162 );
163 return Ok(None);
164 }
165 }
166 }
167 _ => {
168 debug!(
169 "Bailing out from try_frontend_peek_inner, because statement type is not supported"
170 );
171 return Ok(None);
172 }
173 }
174
175 let session_type = metrics::session_type_label_value(session.user());
176 let stmt_type = metrics::statement_type_label_value(&stmt);
177
178 let conn_catalog = catalog.for_session(session);
181 let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?;
184
185 let pcx = session.pcx();
186 let plan = mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, ¶ms, &resolved_ids)?;
187 let (select_plan, explain_ctx, copy_to_ctx) = match &plan {
188 Plan::Select(select_plan) => {
189 let explain_ctx = if session.vars().emit_plan_insights_notice() {
190 let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
191 ExplainContext::PlanInsightsNotice(optimizer_trace)
192 } else {
193 ExplainContext::None
194 };
195 (select_plan, explain_ctx, None)
196 }
197 Plan::ExplainPlan(plan::ExplainPlanPlan {
198 stage,
199 format,
200 config,
201 explainee:
202 plan::Explainee::Statement(plan::ExplaineeStatement::Select { broken, plan, desc }),
203 }) => {
204 let optimizer_trace = OptimizerTrace::new(stage.paths());
206 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
207 broken: *broken,
208 config: config.clone(),
209 format: *format,
210 stage: *stage,
211 replan: None,
212 desc: Some(desc.clone()),
213 optimizer_trace,
214 });
215 (plan, explain_ctx, None)
216 }
217 Plan::CopyTo(plan::CopyToPlan {
219 select_plan,
220 desc,
221 to,
222 connection,
223 connection_id,
224 format,
225 max_file_size,
226 }) => {
227 let uri = eval_copy_to_uri(to.clone(), session, catalog.state())?;
228
229 let copy_to_ctx = CopyToContext {
231 desc: desc.clone(),
232 uri,
233 connection: connection.clone(),
234 connection_id: *connection_id,
235 format: format.clone(),
236 max_file_size: *max_file_size,
237 output_batch_count: None,
238 };
239
240 (select_plan, ExplainContext::None, Some(copy_to_ctx))
241 }
242 Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => {
243 match explainee {
245 plan::Explainee::Statement(plan::ExplaineeStatement::Select {
246 broken: false,
247 plan,
248 desc: _,
249 }) => {
250 let explain_ctx = ExplainContext::Pushdown;
251 (plan, explain_ctx, None)
252 }
253 _ => {
254 debug!(
255 "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is EXPLAIN BROKEN"
256 );
257 return Ok(None);
258 }
259 }
260 }
261 _ => {
262 debug!(
263 "Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO"
264 );
265 return Ok(None);
266 }
267 };
268
269 assert!(plan.allowed_in_read_only());
273
274 let target_cluster = match session.transaction().cluster() {
275 Some(cluster_id) => TargetCluster::Transaction(cluster_id),
277 None => {
279 coord::catalog_serving::auto_run_on_catalog_server(&conn_catalog, session, &plan)
280 }
281 };
282 let (cluster, target_cluster_id, target_cluster_name) = {
283 let cluster = catalog.resolve_target_cluster(target_cluster, session)?;
284 (cluster, cluster.id, &cluster.name)
285 };
286
287 coord::catalog_serving::check_cluster_restrictions(
290 target_cluster_name.as_str(),
291 &conn_catalog,
292 &plan,
293 )?;
294
295 rbac::check_plan(
296 &conn_catalog,
297 None::<fn(u32) -> Option<RoleId>>,
298 session,
299 &plan,
300 Some(target_cluster_id),
301 &resolved_ids,
302 )?;
303
304 if let Some(_) = coord::appends::waiting_on_startup_appends(&*catalog, session, &plan) {
310 debug!("Bailing out from try_frontend_peek_inner, because waiting_on_startup_appends");
319 return Ok(None);
320 }
321
322 let max_query_result_size = Some(session.vars().max_query_result_size());
323
324 let compute_instance_snapshot =
329 ComputeInstanceSnapshot::new_without_collections(cluster.id());
330
331 let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config())
332 .override_from(&catalog.get_cluster(cluster.id()).config.features())
333 .override_from(&explain_ctx);
334
335 if cluster.replicas().next().is_none() && explain_ctx.needs_cluster() {
336 return Err(AdapterError::NoClusterReplicasAvailable {
337 name: cluster.name.clone(),
338 is_managed: cluster.is_managed(),
339 });
340 }
341
342 let (_, view_id) = self.transient_id_gen.allocate_id();
343 let (_, index_id) = self.transient_id_gen.allocate_id();
344
345 let mut optimizer = if let Some(mut copy_to_ctx) = copy_to_ctx {
346 let worker_counts = cluster.replicas().map(|r| {
348 let loc = &r.config.location;
349 loc.workers().unwrap_or_else(|| loc.num_processes())
350 });
351 let max_worker_count = match worker_counts.max() {
352 Some(count) => u64::cast_from(count),
353 None => {
354 return Err(AdapterError::NoClusterReplicasAvailable {
355 name: cluster.name.clone(),
356 is_managed: cluster.is_managed(),
357 });
358 }
359 };
360 copy_to_ctx.output_batch_count = Some(max_worker_count);
361
362 Either::Right(optimize::copy_to::Optimizer::new(
363 Arc::clone(&catalog),
364 compute_instance_snapshot.clone(),
365 view_id,
366 copy_to_ctx,
367 optimizer_config,
368 self.optimizer_metrics.clone(),
369 ))
370 } else {
371 Either::Left(optimize::peek::Optimizer::new(
373 Arc::clone(&catalog),
374 compute_instance_snapshot.clone(),
375 select_plan.finishing.clone(),
376 view_id,
377 index_id,
378 optimizer_config,
379 self.optimizer_metrics.clone(),
380 ))
381 };
382
383 let target_replica_name = session.vars().cluster_replica();
384 let mut target_replica = target_replica_name
385 .map(|name| {
386 cluster
387 .replica_id(name)
388 .ok_or(AdapterError::UnknownClusterReplica {
389 cluster_name: cluster.name.clone(),
390 replica_name: name.to_string(),
391 })
392 })
393 .transpose()?;
394
395 let source_ids = select_plan.source.depends_on();
396 let mut timeline_context = catalog.validate_timeline_context(source_ids.iter().copied())?;
400 if matches!(timeline_context, TimelineContext::TimestampIndependent)
401 && select_plan.source.contains_temporal()?
402 {
403 timeline_context = TimelineContext::TimestampDependent;
407 }
408
409 let notices = coord::sequencer::check_log_reads(
410 &catalog,
411 cluster,
412 &source_ids,
413 &mut target_replica,
414 session.vars(),
415 )?;
416 session.add_notices(notices);
417
418 let isolation_level = session.vars().transaction_isolation().clone();
421 let timeline = Coordinator::get_timeline(&timeline_context);
422 let needs_linearized_read_ts =
423 Coordinator::needs_linearized_read_ts(&isolation_level, &select_plan.when);
424
425 let oracle_read_ts = match timeline {
426 Some(timeline) if needs_linearized_read_ts => {
427 let oracle = self.ensure_oracle(timeline).await?;
428 let oracle_read_ts = oracle.read_ts().await;
429 Some(oracle_read_ts)
430 }
431 Some(_) | None => None,
432 };
433
434 let vars = session.vars();
437 let real_time_recency_ts: Option<Timestamp> = if vars.real_time_recency()
438 && vars.transaction_isolation() == &IsolationLevel::StrictSerializable
439 && !session.contains_read_timestamp()
440 {
441 self.call_coordinator(|tx| Command::DetermineRealTimeRecentTimestamp {
443 source_ids: source_ids.clone(),
444 real_time_recency_timeout: *vars.real_time_recency_timeout(),
445 tx,
446 })
447 .await?
448 } else {
449 None
450 };
451
452 let dataflow_builder = DataflowBuilder::new(catalog.state(), compute_instance_snapshot);
455 let input_id_bundle = dataflow_builder.sufficient_collections(source_ids.clone());
456
457 let in_immediate_multi_stmt_txn = session
470 .transaction()
471 .in_immediate_multi_stmt_txn(&select_plan.when);
472
473 let (determination, read_holds) = match session.get_transaction_timestamp_determination() {
475 Some(
478 determination @ TimestampDetermination {
479 timestamp_context: TimestampContext::TimelineTimestamp { .. },
480 ..
481 },
482 ) if in_immediate_multi_stmt_txn => {
483 let txn_read_holds_opt = self
491 .call_coordinator(|tx| Command::GetTransactionReadHoldsBundle {
492 conn_id: session.conn_id().clone(),
493 tx,
494 })
495 .await;
496
497 if let Some(txn_read_holds) = txn_read_holds_opt {
498 let allowed_id_bundle = txn_read_holds.id_bundle();
499 let outside = input_id_bundle.difference(&allowed_id_bundle);
500
501 if determination.timestamp_context.contains_timestamp() && !outside.is_empty() {
503 let valid_names =
504 allowed_id_bundle.resolve_names(&*catalog, session.conn_id());
505 let invalid_names = outside.resolve_names(&*catalog, session.conn_id());
506 return Err(AdapterError::RelationOutsideTimeDomain {
507 relations: invalid_names,
508 names: valid_names,
509 });
510 }
511
512 let read_holds = txn_read_holds.subset(&input_id_bundle);
514
515 (determination, read_holds)
516 } else {
517 return Err(AdapterError::Internal(
522 "Missing transaction read holds for multi-statement transaction"
523 .to_string(),
524 ));
525 }
526 }
527 _ => {
528 let timedomain_bundle;
535 let determine_bundle = if in_immediate_multi_stmt_txn {
536 timedomain_bundle = timedomain_for(
540 &*catalog,
541 &dataflow_builder,
542 &source_ids,
543 &timeline_context,
544 session.conn_id(),
545 target_cluster_id,
546 )?;
547 &timedomain_bundle
548 } else {
549 &input_id_bundle
551 };
552 let (determination, read_holds) = self
553 .frontend_determine_timestamp(
554 catalog.state(),
555 session,
556 determine_bundle,
557 &select_plan.when,
558 target_cluster_id,
559 &timeline_context,
560 oracle_read_ts,
561 real_time_recency_ts,
562 )
563 .await?;
564
565 if in_immediate_multi_stmt_txn {
569 self.call_coordinator(|tx| Command::StoreTransactionReadHolds {
570 conn_id: session.conn_id().clone(),
571 read_holds: read_holds.clone(),
572 tx,
573 })
574 .await;
575 }
576
577 (determination, read_holds)
578 }
579 };
580
581 let requires_linearization = (&explain_ctx).into();
594 let mut transaction_determination = determination.clone();
595 if select_plan.when.is_transactional() {
596 session.add_transaction_ops(TransactionOps::Peeks {
597 determination: transaction_determination,
598 cluster_id: target_cluster_id,
599 requires_linearization,
600 })?;
601 } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) {
602 transaction_determination.timestamp_context = TimestampContext::NoTimestamp;
604 session.add_transaction_ops(TransactionOps::Peeks {
605 determination: transaction_determination,
606 cluster_id: target_cluster_id,
607 requires_linearization,
608 })?;
609 };
610
611 let stats = statistics_oracle(
614 session,
615 &source_ids,
616 &determination.timestamp_context.antichain(),
617 true,
618 catalog.system_config(),
619 &*self.storage_collections,
620 )
621 .await
622 .unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
623
624 let timestamp_context = determination.timestamp_context.clone();
627 let session_meta = session.meta();
628 let now = catalog.config().now.clone();
629 let select_plan = select_plan.clone();
630 let target_cluster_name = target_cluster_name.clone();
631 let needs_plan_insights = explain_ctx.needs_plan_insights();
632 let determination_for_pushdown = if matches!(explain_ctx, ExplainContext::Pushdown) {
633 Some(determination.clone())
636 } else {
637 None
638 };
639
640 let span = Span::current();
641
642 let catalog_for_insights = if needs_plan_insights {
644 Some(Arc::clone(&catalog))
645 } else {
646 None
647 };
648 let mut compute_instances = BTreeMap::new();
649 if needs_plan_insights {
650 for user_cluster in catalog.user_clusters() {
651 let snapshot = ComputeInstanceSnapshot::new_without_collections(user_cluster.id);
652 compute_instances.insert(user_cluster.name.clone(), snapshot);
653 }
654 }
655
656 enum Execution {
658 Peek {
659 global_lir_plan: optimize::peek::GlobalLirPlan,
660 optimization_finished_at: EpochMillis,
661 plan_insights_optimizer_trace: Option<OptimizerTrace>,
662 insights_ctx: Option<Box<PlanInsightsContext>>,
663 },
664 CopyToS3 {
665 global_lir_plan: optimize::copy_to::GlobalLirPlan,
666 source_ids: BTreeSet<GlobalId>,
667 },
668 ExplainPlan {
669 df_meta: DataflowMetainfo,
670 explain_ctx: ExplainPlanContext,
671 optimizer: optimize::peek::Optimizer,
672 insights_ctx: Option<Box<PlanInsightsContext>>,
673 },
674 ExplainPushdown {
675 imports: BTreeMap<GlobalId, mz_expr::MapFilterProject>,
676 determination: TimestampDetermination<Timestamp>,
677 },
678 }
679
680 let source_ids_for_closure = source_ids.clone();
681 let optimization_result = mz_ore::task::spawn_blocking(
682 || "optimize peek",
683 move || {
684 span.in_scope(|| {
685 let _dispatch_guard = explain_ctx.dispatch_guard();
686
687 let raw_expr = select_plan.source.clone();
688
689 let pipeline = || -> Result<Either<optimize::peek::GlobalLirPlan, optimize::copy_to::GlobalLirPlan>, OptimizerError> {
693 match optimizer.as_mut() {
694 Either::Left(optimizer) => {
695 let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr.clone())?;
698 let local_mir_plan =
700 local_mir_plan.resolve(timestamp_context.clone(), &session_meta, stats);
701 let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
703 Ok(Either::Left(global_lir_plan))
704 }
705 Either::Right(optimizer) => {
706 let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr.clone())?;
709 let local_mir_plan =
711 local_mir_plan.resolve(timestamp_context.clone(), &session_meta, stats);
712 let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
714 Ok(Either::Right(global_lir_plan))
715 }
716 }
717 };
718
719 let global_lir_plan_result = pipeline();
720 let optimization_finished_at = now();
721
722 let create_insights_ctx = |optimizer: &optimize::peek::Optimizer, is_notice: bool| -> Option<Box<PlanInsightsContext>> {
723 if !needs_plan_insights {
724 return None;
725 }
726
727 let catalog = catalog_for_insights.as_ref()?;
728
729 let enable_re_optimize = if needs_plan_insights {
730 let opt_limit = mz_adapter_types::dyncfgs::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION
738 .get(catalog.system_config().dyncfgs());
739 !(is_notice && optimizer.duration() > opt_limit)
740 } else {
741 false
742 };
743
744 Some(Box::new(PlanInsightsContext {
745 stmt: select_plan.select.as_deref().map(Clone::clone).map(Statement::Select),
746 raw_expr: raw_expr.clone(),
747 catalog: Arc::clone(catalog),
748 compute_instances,
749 target_instance: target_cluster_name,
750 metrics: optimizer.metrics().clone(),
751 finishing: optimizer.finishing().clone(),
752 optimizer_config: optimizer.config().clone(),
753 session: session_meta,
754 timestamp_context,
755 view_id: optimizer.select_id(),
756 index_id: optimizer.index_id(),
757 enable_re_optimize,
758 }))
759 };
760
761 match global_lir_plan_result {
762 Ok(Either::Left(global_lir_plan)) => {
763 let optimizer = optimizer.unwrap_left();
765 match explain_ctx {
766 ExplainContext::Plan(explain_ctx) => {
767 let (_, df_meta, _) = global_lir_plan.unapply();
768 let insights_ctx = create_insights_ctx(&optimizer, false);
769 Ok(Execution::ExplainPlan {
770 df_meta,
771 explain_ctx,
772 optimizer,
773 insights_ctx,
774 })
775 }
776 ExplainContext::None => {
777 Ok(Execution::Peek {
778 global_lir_plan,
779 optimization_finished_at,
780 plan_insights_optimizer_trace: None,
781 insights_ctx: None,
782 })
783 }
784 ExplainContext::PlanInsightsNotice(optimizer_trace) => {
785 let insights_ctx = create_insights_ctx(&optimizer, true);
786 Ok(Execution::Peek {
787 global_lir_plan,
788 optimization_finished_at,
789 plan_insights_optimizer_trace: Some(optimizer_trace),
790 insights_ctx,
791 })
792 }
793 ExplainContext::Pushdown => {
794 let (plan, _, _) = global_lir_plan.unapply();
795 let imports = match plan {
796 PeekPlan::SlowPath(plan) => plan
797 .desc
798 .source_imports
799 .into_iter()
800 .filter_map(|(id, (desc, _, _upper))| {
801 desc.arguments.operators.map(|mfp| (id, mfp))
802 })
803 .collect(),
804 PeekPlan::FastPath(_) => std::collections::BTreeMap::default(),
805 };
806 Ok(Execution::ExplainPushdown {
807 imports,
808 determination: determination_for_pushdown.expect("it's present for the ExplainPushdown case"),
809 })
810 }
811 }
812 }
813 Ok(Either::Right(global_lir_plan)) => {
814 Ok(Execution::CopyToS3 {
816 global_lir_plan,
817 source_ids: source_ids_for_closure,
818 })
819 }
820 Err(err) => {
821 if optimizer.is_right() {
822 return Err(err);
824 }
825 let optimizer = optimizer.expect_left("checked above");
827 if let ExplainContext::Plan(explain_ctx) = explain_ctx {
828 if explain_ctx.broken {
829 tracing::error!("error while handling EXPLAIN statement: {}", err);
831 Ok(Execution::ExplainPlan {
832 df_meta: Default::default(),
833 explain_ctx,
834 optimizer,
835 insights_ctx: None,
836 })
837 } else {
838 Err(err)
839 }
840 } else {
841 Err(err)
842 }
843 }
844 }
845 })
846 },
847 )
848 .await
849 .map_err(|optimizer_error| AdapterError::Internal(format!("internal error in optimizer: {}", optimizer_error)))?;
850
851 match optimization_result {
853 Execution::ExplainPlan {
854 df_meta,
855 explain_ctx,
856 optimizer,
857 insights_ctx,
858 } => {
859 let rows = coord::sequencer::explain_plan_inner(
860 session,
861 &catalog,
862 df_meta,
863 explain_ctx,
864 optimizer,
865 insights_ctx,
866 )
867 .await?;
868
869 Ok(Some(ExecuteResponse::SendingRowsImmediate {
870 rows: Box::new(rows.into_row_iter()),
871 }))
872 }
873 Execution::ExplainPushdown {
874 imports,
875 determination,
876 } => {
877 let as_of = determination.timestamp_context.antichain();
880 let mz_now = determination
881 .timestamp_context
882 .timestamp()
883 .map(|t| ResultSpec::value(Datum::MzTimestamp(*t)))
884 .unwrap_or_else(ResultSpec::value_all);
885
886 Ok(Some(
887 coord::sequencer::explain_pushdown_future_inner(
888 session,
889 &*catalog,
890 &self.storage_collections,
891 as_of,
892 mz_now,
893 imports,
894 )
895 .await
896 .await?,
897 ))
898 }
899 Execution::Peek {
900 global_lir_plan,
901 optimization_finished_at: _optimization_finished_at,
902 plan_insights_optimizer_trace,
903 insights_ctx,
904 } => {
905 let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
911
912 coord::sequencer::emit_optimizer_notices(
917 &*catalog,
918 session,
919 &df_meta.optimizer_notices,
920 );
921
922 if let Some(trace) = plan_insights_optimizer_trace {
924 let target_cluster = catalog.get_cluster(target_cluster_id);
925 let features = OptimizerFeatures::from(catalog.system_config())
926 .override_from(&target_cluster.config.features());
927 let insights = trace
928 .into_plan_insights(
929 &features,
930 &catalog.for_session(session),
931 Some(select_plan.finishing.clone()),
932 Some(target_cluster),
933 df_meta.clone(),
934 insights_ctx,
935 )
936 .await?;
937 session.add_notice(AdapterNotice::PlanInsights(insights));
938 }
939
940 session
945 .metrics()
946 .query_total(&[session_type, stmt_type])
947 .inc();
948
949 let max_result_size = catalog.system_config().max_result_size();
954
955 let response = match peek_plan {
956 PeekPlan::FastPath(fast_path_plan) => {
957 let row_set_finishing_seconds =
958 session.metrics().row_set_finishing_seconds().clone();
959
960 let peek_stash_read_batch_size_bytes =
961 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
962 .get(catalog.system_config().dyncfgs());
963 let peek_stash_read_memory_budget_bytes =
964 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
965 .get(catalog.system_config().dyncfgs());
966
967 self.implement_fast_path_peek_plan(
968 fast_path_plan,
969 determination.timestamp_context.timestamp_or_default(),
970 select_plan.finishing,
971 target_cluster_id,
972 target_replica,
973 typ,
974 max_result_size,
975 max_query_result_size,
976 row_set_finishing_seconds,
977 read_holds,
978 peek_stash_read_batch_size_bytes,
979 peek_stash_read_memory_budget_bytes,
980 )
981 .await?
982 }
983 PeekPlan::SlowPath(dataflow_plan) => {
984 self.call_coordinator(|tx| Command::ExecuteSlowPathPeek {
985 dataflow_plan: Box::new(dataflow_plan),
986 determination,
987 finishing: select_plan.finishing,
988 compute_instance: target_cluster_id,
989 target_replica,
990 intermediate_result_type: typ,
991 source_ids,
992 conn_id: session.conn_id().clone(),
993 max_result_size,
994 max_query_result_size,
995 tx,
996 })
997 .await?
998 }
999 };
1000
1001 Ok(Some(match select_plan.copy_to {
1002 None => response,
1003 Some(format) => ExecuteResponse::CopyTo {
1005 format,
1006 resp: Box::new(response),
1007 },
1008 }))
1009 }
1010 Execution::CopyToS3 {
1011 global_lir_plan,
1012 source_ids,
1013 } => {
1014 let (df_desc, df_meta) = global_lir_plan.unapply();
1015
1016 coord::sequencer::emit_optimizer_notices(
1017 &*catalog,
1018 session,
1019 &df_meta.optimizer_notices,
1020 );
1021
1022 let response = self
1023 .call_coordinator(|tx| Command::ExecuteCopyTo {
1024 df_desc: Box::new(df_desc),
1025 compute_instance: target_cluster_id,
1026 target_replica,
1027 source_ids,
1028 conn_id: session.conn_id().clone(),
1029 tx,
1030 })
1031 .await?;
1032
1033 Ok(Some(response))
1034 }
1035 }
1036 }
1037
1038 pub(crate) async fn frontend_determine_timestamp(
1045 &mut self,
1046 catalog_state: &CatalogState,
1047 session: &Session,
1048 id_bundle: &CollectionIdBundle,
1049 when: &QueryWhen,
1050 compute_instance: ComputeInstanceId,
1051 timeline_context: &TimelineContext,
1052 oracle_read_ts: Option<Timestamp>,
1053 real_time_recency_ts: Option<Timestamp>,
1054 ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
1055 let constraint_based = ConstraintBasedTimestampSelection::from_str(
1058 &CONSTRAINT_BASED_TIMESTAMP_SELECTION.get(catalog_state.system_config().dyncfgs()),
1059 );
1060
1061 let isolation_level = session.vars().transaction_isolation();
1062
1063 let (read_holds, upper) = self
1064 .acquire_read_holds_and_least_valid_write(id_bundle)
1065 .await
1066 .map_err(|err| {
1067 AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1068 err,
1069 compute_instance,
1070 )
1071 })?;
1072 let (det, read_holds) = <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1073 session,
1074 id_bundle,
1075 when,
1076 compute_instance,
1077 timeline_context,
1078 oracle_read_ts,
1079 real_time_recency_ts,
1080 isolation_level,
1081 &constraint_based,
1082 read_holds,
1083 upper.clone(),
1084 )?;
1085
1086 session
1087 .metrics()
1088 .determine_timestamp(&[
1089 match det.respond_immediately() {
1090 true => "true",
1091 false => "false",
1092 },
1093 isolation_level.as_str(),
1094 &compute_instance.to_string(),
1095 constraint_based.as_str(),
1096 ])
1097 .inc();
1098 if !det.respond_immediately()
1099 && isolation_level == &IsolationLevel::StrictSerializable
1100 && real_time_recency_ts.is_none()
1101 {
1102 if let Some(strict) = det.timestamp_context.timestamp() {
1104 let (serializable_det, _tmp_read_holds) =
1105 <Coordinator as TimestampProvider>::determine_timestamp_for_inner(
1106 session,
1107 id_bundle,
1108 when,
1109 compute_instance,
1110 timeline_context,
1111 oracle_read_ts,
1112 real_time_recency_ts,
1113 isolation_level,
1114 &constraint_based,
1115 read_holds.clone(),
1116 upper,
1117 )?;
1118 if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
1119 session
1120 .metrics()
1121 .timestamp_difference_for_strict_serializable_ms(&[
1122 compute_instance.to_string().as_ref(),
1123 constraint_based.as_str(),
1124 ])
1125 .observe(f64::cast_lossy(u64::from(
1126 strict.saturating_sub(*serializable),
1127 )));
1128 }
1129 }
1130 }
1131
1132 Ok((det, read_holds))
1133 }
1134}